Multithread symlink forest removal.

This is a second attempt at aosp/2273288 which got rolled back because
it turned out that:

1. We make ~120K symlinks in AOSP (!), all of which need to be deleted
2. System calls are sometimes slow
3. Golang spawns a new OS-level thread for each blocking system calls to
   keep cores busy

All this together means that we sometimes had 10K system calls in
flight, which meant 10K OS-level threads, which is when Go gives up and
says "I created too many threads, please help".

The fix is to move the system calls into a pool of goroutines, which
soon end up on a pool of threads (since they mostly do blocking system
calls)

Test: Presubmits.
Change-Id: Ia9aefff3b0ed373f09bb6c8b2ec1d8b0f00b213b
diff --git a/bp2build/java_binary_host_conversion_test.go b/bp2build/java_binary_host_conversion_test.go
index c860844..e8551e5 100644
--- a/bp2build/java_binary_host_conversion_test.go
+++ b/bp2build/java_binary_host_conversion_test.go
@@ -33,7 +33,7 @@
 	}, tc)
 }
 
-var fs = map[string]string{
+var testFs = map[string]string{
 	"test.mf": "Main-Class: com.android.test.MainClass",
 	"other/Android.bp": `cc_library_host_shared {
     name: "jni-lib-1",
@@ -44,7 +44,7 @@
 func TestJavaBinaryHost(t *testing.T) {
 	runJavaBinaryHostTestCase(t, Bp2buildTestCase{
 		Description: "java_binary_host with srcs, exclude_srcs, jni_libs, javacflags, and manifest.",
-		Filesystem:  fs,
+		Filesystem:  testFs,
 		Blueprint: `java_binary_host {
     name: "java-binary-host-1",
     srcs: ["a.java", "b.java"],
@@ -77,7 +77,7 @@
 func TestJavaBinaryHostRuntimeDeps(t *testing.T) {
 	runJavaBinaryHostTestCase(t, Bp2buildTestCase{
 		Description: "java_binary_host with srcs, exclude_srcs, jni_libs, javacflags, and manifest.",
-		Filesystem:  fs,
+		Filesystem:  testFs,
 		Blueprint: `java_binary_host {
     name: "java-binary-host-1",
     static_libs: ["java-dep-1"],
@@ -107,7 +107,7 @@
 func TestJavaBinaryHostLibs(t *testing.T) {
 	runJavaBinaryHostTestCase(t, Bp2buildTestCase{
 		Description: "java_binary_host with srcs, libs.",
-		Filesystem:  fs,
+		Filesystem:  testFs,
 		Blueprint: `java_binary_host {
     name: "java-binary-host-libs",
     libs: ["java-lib-dep-1"],
diff --git a/bp2build/symlink_forest.go b/bp2build/symlink_forest.go
index 45817e3..81ec7ee 100644
--- a/bp2build/symlink_forest.go
+++ b/bp2build/symlink_forest.go
@@ -15,7 +15,9 @@
 package bp2build
 
 import (
+	"errors"
 	"fmt"
+	"io/fs"
 	"io/ioutil"
 	"os"
 	"path/filepath"
@@ -49,6 +51,59 @@
 	okay  atomic.Bool // Whether the forest was successfully constructed
 }
 
+// A simple thread pool to limit concurrency on system calls.
+// Necessary because Go spawns a new OS-level thread for each blocking system
+// call. This means that if syscalls are too slow and there are too many of
+// them, the hard limit on OS-level threads can be exhausted.
+type syscallPool struct {
+	shutdownCh []chan<- struct{}
+	workCh     chan syscall
+}
+
+type syscall struct {
+	work func()
+	done chan<- struct{}
+}
+
+func createSyscallPool(count int) *syscallPool {
+	result := &syscallPool{
+		shutdownCh: make([]chan<- struct{}, count),
+		workCh:     make(chan syscall),
+	}
+
+	for i := 0; i < count; i++ {
+		shutdownCh := make(chan struct{})
+		result.shutdownCh[i] = shutdownCh
+		go result.worker(shutdownCh)
+	}
+
+	return result
+}
+
+func (p *syscallPool) do(work func()) {
+	doneCh := make(chan struct{})
+	p.workCh <- syscall{work, doneCh}
+	<-doneCh
+}
+
+func (p *syscallPool) shutdown() {
+	for _, ch := range p.shutdownCh {
+		ch <- struct{}{} // Blocks until the value is received
+	}
+}
+
+func (p *syscallPool) worker(shutdownCh <-chan struct{}) {
+	for {
+		select {
+		case <-shutdownCh:
+			return
+		case work := <-p.workCh:
+			work.work()
+			work.done <- struct{}{}
+		}
+	}
+}
+
 // Ensures that the node for the given path exists in the tree and returns it.
 func ensureNodeExists(root *instructionsNode, path string) *instructionsNode {
 	if path == "" {
@@ -317,6 +372,51 @@
 	}
 }
 
+func removeParallelRecursive(pool *syscallPool, path string, fi os.FileInfo, wg *sync.WaitGroup) {
+	defer wg.Done()
+
+	if fi.IsDir() {
+		children := readdirToMap(path)
+		childrenWg := &sync.WaitGroup{}
+		childrenWg.Add(len(children))
+
+		for child, childFi := range children {
+			go removeParallelRecursive(pool, shared.JoinPath(path, child), childFi, childrenWg)
+		}
+
+		childrenWg.Wait()
+	}
+
+	pool.do(func() {
+		if err := os.Remove(path); err != nil {
+			fmt.Fprintf(os.Stderr, "Cannot unlink '%s': %s\n", path, err)
+			os.Exit(1)
+		}
+	})
+}
+
+func removeParallel(path string) {
+	fi, err := os.Lstat(path)
+	if err != nil {
+		if errors.Is(err, fs.ErrNotExist) {
+			return
+		}
+
+		fmt.Fprintf(os.Stderr, "Cannot lstat '%s': %s\n", path, err)
+		os.Exit(1)
+	}
+
+	wg := &sync.WaitGroup{}
+	wg.Add(1)
+
+	// Random guess as to the best number of syscalls to run in parallel
+	pool := createSyscallPool(100)
+	removeParallelRecursive(pool, path, fi, wg)
+	pool.shutdown()
+
+	wg.Wait()
+}
+
 // Creates a symlink forest by merging the directory tree at "buildFiles" and
 // "srcDir" while excluding paths listed in "exclude". Returns the set of paths
 // under srcDir on which readdir() had to be called to produce the symlink
@@ -330,7 +430,7 @@
 
 	context.okay.Store(true)
 
-	os.RemoveAll(shared.JoinPath(topdir, forest))
+	removeParallel(shared.JoinPath(topdir, forest))
 
 	instructions := instructionsFromExcludePathList(exclude)
 	go func() {