Revert "Refactor rate_limit.go for more clarify"
This reverts commit 69f3b3e9460f4ba41fd82c005311c8c50c88936a.
Caused rare deadlocks.
Bug: 64536066
Bug: 64813447
Change-Id: Ieb1b931bb2c0afdd8bf8edbfc32c373df3c08d8d
diff --git a/cmd/soong_zip/rate_limit.go b/cmd/soong_zip/rate_limit.go
index 04102b7..9e95bc1 100644
--- a/cmd/soong_zip/rate_limit.go
+++ b/cmd/soong_zip/rate_limit.go
@@ -15,54 +15,71 @@
package main
import (
- "fmt"
"runtime"
)
type RateLimit struct {
- requests chan request
- completions chan int64
-
- stop chan struct{}
+ requests chan struct{}
+ finished chan int
+ released chan int
+ stop chan struct{}
}
-type request struct {
- size int64
- serviced chan struct{}
-}
-
-// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
-// except when no capacity is in use, in which case the first caller is always permitted
-func NewRateLimit(capacity int64) *RateLimit {
- ret := &RateLimit{
- requests: make(chan request),
- completions: make(chan int64),
-
- stop: make(chan struct{}),
+// NewRateLimit starts a new rate limiter with maxExecs number of executions
+// allowed to happen at a time. If maxExecs is <= 0, it will default to the
+// number of logical CPUs on the system.
+//
+// With Finish and Release, we'll keep track of outstanding buffer sizes to be
+// written. If that size goes above maxMem, we'll prevent starting new
+// executions.
+//
+// The total memory use may be higher due to current executions. This just
+// prevents runaway memory use due to slower writes.
+func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
+ if maxExecs <= 0 {
+ maxExecs = runtime.NumCPU()
+ }
+ if maxMem <= 0 {
+ // Default to 512MB
+ maxMem = 512 * 1024 * 1024
}
- go ret.monitorChannels(capacity)
+ ret := &RateLimit{
+ requests: make(chan struct{}),
+
+ // Let all of the pending executions to mark themselves as finished,
+ // even if our goroutine isn't processing input.
+ finished: make(chan int, maxExecs),
+
+ released: make(chan int),
+ stop: make(chan struct{}),
+ }
+
+ go ret.goFunc(maxExecs, maxMem)
return ret
}
-// RequestExecution blocks until another execution of size <size> can be allowed to run.
-func (r *RateLimit) Request(size int64) {
- request := request{
- size: size,
- serviced: make(chan struct{}, 1),
- }
-
- // wait for the request to be received
- r.requests <- request
-
- // wait for the request to be accepted
- <-request.serviced
+// RequestExecution blocks until another execution can be allowed to run.
+func (r *RateLimit) RequestExecution() Execution {
+ <-r.requests
+ return r.finished
}
-// Finish declares the completion of an execution of size <size>
-func (r *RateLimit) Finish(size int64) {
- r.completions <- size
+type Execution chan<- int
+
+// Finish will mark your execution as finished, and allow another request to be
+// approved.
+//
+// bufferSize may be specified to count memory buffer sizes, and must be
+// matched with calls to RateLimit.Release to mark the buffers as released.
+func (e Execution) Finish(bufferSize int) {
+ e <- bufferSize
+}
+
+// Call Release when finished with a buffer recorded with Finish.
+func (r *RateLimit) Release(bufferSize int) {
+ r.released <- bufferSize
}
// Stop the background goroutine
@@ -70,83 +87,29 @@
close(r.stop)
}
-// monitorChannels processes incoming requests from channels
-func (r *RateLimit) monitorChannels(capacity int64) {
- var usedCapacity int64
- var currentRequest *request
+func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
+ var curExecs int
+ var curMemory int64
for {
- var requests chan request
- if currentRequest == nil {
- // If we don't already have a queued request, then we should check for a new request
+ var requests chan struct{}
+ if curExecs < maxExecs && curMemory < maxMem {
requests = r.requests
}
select {
- case newRequest := <-requests:
- currentRequest = &newRequest
- break
- case amountCompleted := <-r.completions:
- usedCapacity -= amountCompleted
- if usedCapacity < 0 {
- panic(fmt.Sprintf("usedCapacity < 0: %v", usedCapacity))
+ case requests <- struct{}{}:
+ curExecs++
+ case amount := <-r.finished:
+ curExecs--
+ curMemory += int64(amount)
+ if curExecs < 0 {
+ panic("curExecs < 0")
}
+ case amount := <-r.released:
+ curMemory -= int64(amount)
case <-r.stop:
return
}
-
- if currentRequest != nil {
- accepted := false
- if usedCapacity == 0 {
- accepted = true
- } else {
- if capacity >= usedCapacity+currentRequest.size {
- accepted = true
- }
- }
- if accepted {
- usedCapacity += currentRequest.size
- currentRequest.serviced <- struct{}{}
- currentRequest = nil
- }
- }
}
}
-
-// A CPURateLimiter limits the number of active calls based on CPU requirements
-type CPURateLimiter struct {
- impl *RateLimit
-}
-
-func NewCPURateLimiter(capacity int64) *CPURateLimiter {
- if capacity <= 0 {
- capacity = int64(runtime.NumCPU())
- }
- impl := NewRateLimit(capacity)
- return &CPURateLimiter{impl: impl}
-}
-
-func (e CPURateLimiter) Request() {
- e.impl.Request(1)
-}
-
-func (e CPURateLimiter) Finish() {
- e.impl.Finish(1)
-}
-
-func (e CPURateLimiter) Stop() {
- e.impl.Stop()
-}
-
-// A MemoryRateLimiter limits the number of active calls based on Memory requirements
-type MemoryRateLimiter struct {
- *RateLimit
-}
-
-func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
- if capacity <= 0 {
- capacity = 512 * 1024 * 1024 // 512MB
- }
- impl := NewRateLimit(capacity)
- return &MemoryRateLimiter{RateLimit: impl}
-}