Revert "Refactor rate_limit.go for more clarify"

This reverts commit 69f3b3e9460f4ba41fd82c005311c8c50c88936a.

Caused rare deadlocks.

Bug: 64536066
Bug: 64813447
Change-Id: Ieb1b931bb2c0afdd8bf8edbfc32c373df3c08d8d
diff --git a/cmd/soong_zip/rate_limit.go b/cmd/soong_zip/rate_limit.go
index 04102b7..9e95bc1 100644
--- a/cmd/soong_zip/rate_limit.go
+++ b/cmd/soong_zip/rate_limit.go
@@ -15,54 +15,71 @@
 package main
 
 import (
-	"fmt"
 	"runtime"
 )
 
 type RateLimit struct {
-	requests    chan request
-	completions chan int64
-
-	stop chan struct{}
+	requests chan struct{}
+	finished chan int
+	released chan int
+	stop     chan struct{}
 }
 
-type request struct {
-	size     int64
-	serviced chan struct{}
-}
-
-// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
-// except when no capacity is in use, in which case the first caller is always permitted
-func NewRateLimit(capacity int64) *RateLimit {
-	ret := &RateLimit{
-		requests:    make(chan request),
-		completions: make(chan int64),
-
-		stop: make(chan struct{}),
+// NewRateLimit starts a new rate limiter with maxExecs number of executions
+// allowed to happen at a time. If maxExecs is <= 0, it will default to the
+// number of logical CPUs on the system.
+//
+// With Finish and Release, we'll keep track of outstanding buffer sizes to be
+// written. If that size goes above maxMem, we'll prevent starting new
+// executions.
+//
+// The total memory use may be higher due to current executions. This just
+// prevents runaway memory use due to slower writes.
+func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
+	if maxExecs <= 0 {
+		maxExecs = runtime.NumCPU()
+	}
+	if maxMem <= 0 {
+		// Default to 512MB
+		maxMem = 512 * 1024 * 1024
 	}
 
-	go ret.monitorChannels(capacity)
+	ret := &RateLimit{
+		requests: make(chan struct{}),
+
+		// Let all of the pending executions to mark themselves as finished,
+		// even if our goroutine isn't processing input.
+		finished: make(chan int, maxExecs),
+
+		released: make(chan int),
+		stop:     make(chan struct{}),
+	}
+
+	go ret.goFunc(maxExecs, maxMem)
 
 	return ret
 }
 
-// RequestExecution blocks until another execution of size <size> can be allowed to run.
-func (r *RateLimit) Request(size int64) {
-	request := request{
-		size:     size,
-		serviced: make(chan struct{}, 1),
-	}
-
-	// wait for the request to be received
-	r.requests <- request
-
-	// wait for the request to be accepted
-	<-request.serviced
+// RequestExecution blocks until another execution can be allowed to run.
+func (r *RateLimit) RequestExecution() Execution {
+	<-r.requests
+	return r.finished
 }
 
-// Finish declares the completion of an execution of size <size>
-func (r *RateLimit) Finish(size int64) {
-	r.completions <- size
+type Execution chan<- int
+
+// Finish will mark your execution as finished, and allow another request to be
+// approved.
+//
+// bufferSize may be specified to count memory buffer sizes, and must be
+// matched with calls to RateLimit.Release to mark the buffers as released.
+func (e Execution) Finish(bufferSize int) {
+	e <- bufferSize
+}
+
+// Call Release when finished with a buffer recorded with Finish.
+func (r *RateLimit) Release(bufferSize int) {
+	r.released <- bufferSize
 }
 
 // Stop the background goroutine
@@ -70,83 +87,29 @@
 	close(r.stop)
 }
 
-// monitorChannels processes incoming requests from channels
-func (r *RateLimit) monitorChannels(capacity int64) {
-	var usedCapacity int64
-	var currentRequest *request
+func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
+	var curExecs int
+	var curMemory int64
 
 	for {
-		var requests chan request
-		if currentRequest == nil {
-			// If we don't already have a queued request, then we should check for a new request
+		var requests chan struct{}
+		if curExecs < maxExecs && curMemory < maxMem {
 			requests = r.requests
 		}
 
 		select {
-		case newRequest := <-requests:
-			currentRequest = &newRequest
-			break
-		case amountCompleted := <-r.completions:
-			usedCapacity -= amountCompleted
-			if usedCapacity < 0 {
-				panic(fmt.Sprintf("usedCapacity < 0: %v", usedCapacity))
+		case requests <- struct{}{}:
+			curExecs++
+		case amount := <-r.finished:
+			curExecs--
+			curMemory += int64(amount)
+			if curExecs < 0 {
+				panic("curExecs < 0")
 			}
+		case amount := <-r.released:
+			curMemory -= int64(amount)
 		case <-r.stop:
 			return
 		}
-
-		if currentRequest != nil {
-			accepted := false
-			if usedCapacity == 0 {
-				accepted = true
-			} else {
-				if capacity >= usedCapacity+currentRequest.size {
-					accepted = true
-				}
-			}
-			if accepted {
-				usedCapacity += currentRequest.size
-				currentRequest.serviced <- struct{}{}
-				currentRequest = nil
-			}
-		}
 	}
 }
-
-// A CPURateLimiter limits the number of active calls based on CPU requirements
-type CPURateLimiter struct {
-	impl *RateLimit
-}
-
-func NewCPURateLimiter(capacity int64) *CPURateLimiter {
-	if capacity <= 0 {
-		capacity = int64(runtime.NumCPU())
-	}
-	impl := NewRateLimit(capacity)
-	return &CPURateLimiter{impl: impl}
-}
-
-func (e CPURateLimiter) Request() {
-	e.impl.Request(1)
-}
-
-func (e CPURateLimiter) Finish() {
-	e.impl.Finish(1)
-}
-
-func (e CPURateLimiter) Stop() {
-	e.impl.Stop()
-}
-
-// A MemoryRateLimiter limits the number of active calls based on Memory requirements
-type MemoryRateLimiter struct {
-	*RateLimit
-}
-
-func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
-	if capacity <= 0 {
-		capacity = 512 * 1024 * 1024 // 512MB
-	}
-	impl := NewRateLimit(capacity)
-	return &MemoryRateLimiter{RateLimit: impl}
-}
diff --git a/cmd/soong_zip/soong_zip.go b/cmd/soong_zip/soong_zip.go
index 3d47f2c..d634dda 100644
--- a/cmd/soong_zip/soong_zip.go
+++ b/cmd/soong_zip/soong_zip.go
@@ -163,8 +163,7 @@
 	errors   chan error
 	writeOps chan chan *zipEntry
 
-	cpuRateLimiter    *CPURateLimiter
-	memoryRateLimiter *MemoryRateLimiter
+	rateLimit *RateLimit
 
 	compressorPool sync.Pool
 	compLevel      int
@@ -296,12 +295,8 @@
 	// The RateLimit object will put the upper bounds on the number of
 	// parallel compressions and outstanding buffers.
 	z.writeOps = make(chan chan *zipEntry, 1000)
-	z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
-	z.memoryRateLimiter = NewMemoryRateLimiter(0)
-	defer func() {
-		z.cpuRateLimiter.Stop()
-		z.memoryRateLimiter.Stop()
-	}()
+	z.rateLimit = NewRateLimit(*parallelJobs, 0)
+	defer z.rateLimit.Stop()
 
 	go func() {
 		var err error
@@ -391,7 +386,7 @@
 			if err != nil {
 				return err
 			}
-			z.memoryRateLimiter.Finish(count)
+			z.rateLimit.Release(int(count))
 
 			currentReader = nil
 
@@ -461,7 +456,7 @@
 		return err
 	}
 
-	z.cpuRateLimiter.Request()
+	exec := z.rateLimit.RequestExecution()
 
 	if method == zip.Deflate && fileSize >= minParallelFileSize {
 		wg := new(sync.WaitGroup)
@@ -478,14 +473,14 @@
 		// know the result before we can begin writing the compressed
 		// data out to the zipfile.
 		wg.Add(1)
-		go z.crcFile(r, ze, compressChan, wg)
+		go z.crcFile(r, ze, exec, compressChan, wg)
 
 		for start := int64(0); start < fileSize; start += parallelBlockSize {
 			sr := io.NewSectionReader(r, start, parallelBlockSize)
 			resultChan := make(chan io.Reader, 1)
 			ze.futureReaders <- resultChan
 
-			z.cpuRateLimiter.Request()
+			exec := z.rateLimit.RequestExecution()
 
 			last := !(start+parallelBlockSize < fileSize)
 			var dict []byte
@@ -494,7 +489,7 @@
 			}
 
 			wg.Add(1)
-			go z.compressPartialFile(sr, dict, last, resultChan, wg)
+			go z.compressPartialFile(sr, dict, last, exec, resultChan, wg)
 		}
 
 		close(ze.futureReaders)
@@ -505,15 +500,15 @@
 			f.Close()
 		}(wg, r)
 	} else {
-		go z.compressWholeFile(ze, r, compressChan)
+		go z.compressWholeFile(ze, r, exec, compressChan)
 	}
 
 	return nil
 }
 
-func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
+func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) {
 	defer wg.Done()
-	defer z.cpuRateLimiter.Finish()
+	defer exec.Finish(0)
 
 	crc := crc32.NewIEEE()
 	_, err := io.Copy(crc, r)
@@ -527,7 +522,7 @@
 	close(resultChan)
 }
 
-func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
+func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) {
 	defer wg.Done()
 
 	result, err := z.compressBlock(r, dict, last)
@@ -536,9 +531,7 @@
 		return
 	}
 
-	z.memoryRateLimiter.Request(int64(result.Len()))
-	z.cpuRateLimiter.Finish()
-
+	exec.Finish(result.Len())
 	resultChan <- result
 }
 
@@ -576,7 +569,7 @@
 	return buf, nil
 }
 
-func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
+func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) {
 	var bufSize int
 
 	defer r.Close()
@@ -645,9 +638,7 @@
 		bufSize = int(ze.fh.UncompressedSize64)
 	}
 
-	z.memoryRateLimiter.Request(int64(bufSize))
-	z.cpuRateLimiter.Finish()
-
+	exec.Finish(bufSize)
 	close(futureReader)
 
 	compressChan <- ze
@@ -718,7 +709,7 @@
 	// We didn't ask permission to execute, since this should be very short
 	// but we still need to increment the outstanding buffer sizes, since
 	// the read will decrement the buffer size.
-	z.memoryRateLimiter.Finish(int64(-len(dest)))
+	z.rateLimit.Release(-len(dest))
 
 	ze <- &zipEntry{
 		fh:            fileHeader,