Refactor rate_limit.go for more clarify

This wasn't intended to change the behavior, but it does slightly.

  Previously any requests to acquire memory wouldn't block; only
  (subsequent) requests for execution would block (if another
  caller had already consumed the memory quota). Now the requests
  for memory can also also block.

It turns out in a brief test on my computer that soong_zip
runs about 10 to 20% faster with this change than without it.

  The final step involving soong_zip decreased from about
  3.6 sec to about 3.3 sec in tests on my computer.

  When testing the process of re-zipping the contents of
  angler-target_files*.zip , the time decreased from about
  6.3 sec to about 5.3 sec in tests on my computer, and the
  peak memory usage reported by `top` decreased from about
  1.5g to 1g

Bug: 64536066
Test: m -j dist showcommands # which runs soong_zip to package everything
Change-Id: I0422e4c363c675bb7a93309fac4616c768bfbe8f
diff --git a/cmd/soong_zip/rate_limit.go b/cmd/soong_zip/rate_limit.go
index 9e95bc1..04102b7 100644
--- a/cmd/soong_zip/rate_limit.go
+++ b/cmd/soong_zip/rate_limit.go
@@ -15,71 +15,54 @@
 package main
 
 import (
+	"fmt"
 	"runtime"
 )
 
 type RateLimit struct {
-	requests chan struct{}
-	finished chan int
-	released chan int
-	stop     chan struct{}
+	requests    chan request
+	completions chan int64
+
+	stop chan struct{}
 }
 
-// NewRateLimit starts a new rate limiter with maxExecs number of executions
-// allowed to happen at a time. If maxExecs is <= 0, it will default to the
-// number of logical CPUs on the system.
-//
-// With Finish and Release, we'll keep track of outstanding buffer sizes to be
-// written. If that size goes above maxMem, we'll prevent starting new
-// executions.
-//
-// The total memory use may be higher due to current executions. This just
-// prevents runaway memory use due to slower writes.
-func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
-	if maxExecs <= 0 {
-		maxExecs = runtime.NumCPU()
-	}
-	if maxMem <= 0 {
-		// Default to 512MB
-		maxMem = 512 * 1024 * 1024
-	}
+type request struct {
+	size     int64
+	serviced chan struct{}
+}
 
+// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
+// except when no capacity is in use, in which case the first caller is always permitted
+func NewRateLimit(capacity int64) *RateLimit {
 	ret := &RateLimit{
-		requests: make(chan struct{}),
+		requests:    make(chan request),
+		completions: make(chan int64),
 
-		// Let all of the pending executions to mark themselves as finished,
-		// even if our goroutine isn't processing input.
-		finished: make(chan int, maxExecs),
-
-		released: make(chan int),
-		stop:     make(chan struct{}),
+		stop: make(chan struct{}),
 	}
 
-	go ret.goFunc(maxExecs, maxMem)
+	go ret.monitorChannels(capacity)
 
 	return ret
 }
 
-// RequestExecution blocks until another execution can be allowed to run.
-func (r *RateLimit) RequestExecution() Execution {
-	<-r.requests
-	return r.finished
+// RequestExecution blocks until another execution of size <size> can be allowed to run.
+func (r *RateLimit) Request(size int64) {
+	request := request{
+		size:     size,
+		serviced: make(chan struct{}, 1),
+	}
+
+	// wait for the request to be received
+	r.requests <- request
+
+	// wait for the request to be accepted
+	<-request.serviced
 }
 
-type Execution chan<- int
-
-// Finish will mark your execution as finished, and allow another request to be
-// approved.
-//
-// bufferSize may be specified to count memory buffer sizes, and must be
-// matched with calls to RateLimit.Release to mark the buffers as released.
-func (e Execution) Finish(bufferSize int) {
-	e <- bufferSize
-}
-
-// Call Release when finished with a buffer recorded with Finish.
-func (r *RateLimit) Release(bufferSize int) {
-	r.released <- bufferSize
+// Finish declares the completion of an execution of size <size>
+func (r *RateLimit) Finish(size int64) {
+	r.completions <- size
 }
 
 // Stop the background goroutine
@@ -87,29 +70,83 @@
 	close(r.stop)
 }
 
-func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
-	var curExecs int
-	var curMemory int64
+// monitorChannels processes incoming requests from channels
+func (r *RateLimit) monitorChannels(capacity int64) {
+	var usedCapacity int64
+	var currentRequest *request
 
 	for {
-		var requests chan struct{}
-		if curExecs < maxExecs && curMemory < maxMem {
+		var requests chan request
+		if currentRequest == nil {
+			// If we don't already have a queued request, then we should check for a new request
 			requests = r.requests
 		}
 
 		select {
-		case requests <- struct{}{}:
-			curExecs++
-		case amount := <-r.finished:
-			curExecs--
-			curMemory += int64(amount)
-			if curExecs < 0 {
-				panic("curExecs < 0")
+		case newRequest := <-requests:
+			currentRequest = &newRequest
+			break
+		case amountCompleted := <-r.completions:
+			usedCapacity -= amountCompleted
+			if usedCapacity < 0 {
+				panic(fmt.Sprintf("usedCapacity < 0: %v", usedCapacity))
 			}
-		case amount := <-r.released:
-			curMemory -= int64(amount)
 		case <-r.stop:
 			return
 		}
+
+		if currentRequest != nil {
+			accepted := false
+			if usedCapacity == 0 {
+				accepted = true
+			} else {
+				if capacity >= usedCapacity+currentRequest.size {
+					accepted = true
+				}
+			}
+			if accepted {
+				usedCapacity += currentRequest.size
+				currentRequest.serviced <- struct{}{}
+				currentRequest = nil
+			}
+		}
 	}
 }
+
+// A CPURateLimiter limits the number of active calls based on CPU requirements
+type CPURateLimiter struct {
+	impl *RateLimit
+}
+
+func NewCPURateLimiter(capacity int64) *CPURateLimiter {
+	if capacity <= 0 {
+		capacity = int64(runtime.NumCPU())
+	}
+	impl := NewRateLimit(capacity)
+	return &CPURateLimiter{impl: impl}
+}
+
+func (e CPURateLimiter) Request() {
+	e.impl.Request(1)
+}
+
+func (e CPURateLimiter) Finish() {
+	e.impl.Finish(1)
+}
+
+func (e CPURateLimiter) Stop() {
+	e.impl.Stop()
+}
+
+// A MemoryRateLimiter limits the number of active calls based on Memory requirements
+type MemoryRateLimiter struct {
+	*RateLimit
+}
+
+func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
+	if capacity <= 0 {
+		capacity = 512 * 1024 * 1024 // 512MB
+	}
+	impl := NewRateLimit(capacity)
+	return &MemoryRateLimiter{RateLimit: impl}
+}