Merge "Consolidate localIncludeDirs and exportIncludeDirs"
diff --git a/cmd/soong_zip/rate_limit.go b/cmd/soong_zip/rate_limit.go
index 9e95bc1..9cb5fdd 100644
--- a/cmd/soong_zip/rate_limit.go
+++ b/cmd/soong_zip/rate_limit.go
@@ -15,71 +15,54 @@
package main
import (
+ "fmt"
"runtime"
)
type RateLimit struct {
- requests chan struct{}
- finished chan int
- released chan int
- stop chan struct{}
+ requests chan request
+ completions chan int64
+
+ stop chan struct{}
}
-// NewRateLimit starts a new rate limiter with maxExecs number of executions
-// allowed to happen at a time. If maxExecs is <= 0, it will default to the
-// number of logical CPUs on the system.
-//
-// With Finish and Release, we'll keep track of outstanding buffer sizes to be
-// written. If that size goes above maxMem, we'll prevent starting new
-// executions.
-//
-// The total memory use may be higher due to current executions. This just
-// prevents runaway memory use due to slower writes.
-func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
- if maxExecs <= 0 {
- maxExecs = runtime.NumCPU()
- }
- if maxMem <= 0 {
- // Default to 512MB
- maxMem = 512 * 1024 * 1024
- }
+type request struct {
+ size int64
+ serviced chan struct{}
+}
+// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
+// except when no capacity is in use, in which case the first caller is always permitted
+func NewRateLimit(capacity int64) *RateLimit {
ret := &RateLimit{
- requests: make(chan struct{}),
+ requests: make(chan request),
+ completions: make(chan int64),
- // Let all of the pending executions to mark themselves as finished,
- // even if our goroutine isn't processing input.
- finished: make(chan int, maxExecs),
-
- released: make(chan int),
- stop: make(chan struct{}),
+ stop: make(chan struct{}),
}
- go ret.goFunc(maxExecs, maxMem)
+ go ret.monitorChannels(capacity)
return ret
}
-// RequestExecution blocks until another execution can be allowed to run.
-func (r *RateLimit) RequestExecution() Execution {
- <-r.requests
- return r.finished
+// RequestExecution blocks until another execution of size <size> can be allowed to run.
+func (r *RateLimit) Request(size int64) {
+ request := request{
+ size: size,
+ serviced: make(chan struct{}, 1),
+ }
+
+ // wait for the request to be received
+ r.requests <- request
+
+ // wait for the request to be accepted
+ <-request.serviced
}
-type Execution chan<- int
-
-// Finish will mark your execution as finished, and allow another request to be
-// approved.
-//
-// bufferSize may be specified to count memory buffer sizes, and must be
-// matched with calls to RateLimit.Release to mark the buffers as released.
-func (e Execution) Finish(bufferSize int) {
- e <- bufferSize
-}
-
-// Call Release when finished with a buffer recorded with Finish.
-func (r *RateLimit) Release(bufferSize int) {
- r.released <- bufferSize
+// Finish declares the completion of an execution of size <size>
+func (r *RateLimit) Finish(size int64) {
+ r.completions <- size
}
// Stop the background goroutine
@@ -87,29 +70,83 @@
close(r.stop)
}
-func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
- var curExecs int
- var curMemory int64
+// monitorChannels processes incoming requests from channels
+func (r *RateLimit) monitorChannels(capacity int64) {
+ var usedCapacity int64
+ var currentRequest *request
for {
- var requests chan struct{}
- if curExecs < maxExecs && curMemory < maxMem {
+ var requests chan request
+ if currentRequest == nil {
+ // If we don't already have a queued request, then we should check for a new request
requests = r.requests
}
select {
- case requests <- struct{}{}:
- curExecs++
- case amount := <-r.finished:
- curExecs--
- curMemory += int64(amount)
- if curExecs < 0 {
- panic("curExecs < 0")
+ case newRequest := <-requests:
+ currentRequest = &newRequest
+ case amountCompleted := <-r.completions:
+ usedCapacity -= amountCompleted
+
+ if usedCapacity < 0 {
+ panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
}
- case amount := <-r.released:
- curMemory -= int64(amount)
case <-r.stop:
return
}
+
+ if currentRequest != nil {
+ accepted := false
+ if usedCapacity == 0 {
+ accepted = true
+ } else {
+ if capacity >= usedCapacity+currentRequest.size {
+ accepted = true
+ }
+ }
+ if accepted {
+ usedCapacity += currentRequest.size
+ currentRequest.serviced <- struct{}{}
+ currentRequest = nil
+ }
+ }
}
}
+
+// A CPURateLimiter limits the number of active calls based on CPU requirements
+type CPURateLimiter struct {
+ impl *RateLimit
+}
+
+func NewCPURateLimiter(capacity int64) *CPURateLimiter {
+ if capacity <= 0 {
+ capacity = int64(runtime.NumCPU())
+ }
+ impl := NewRateLimit(capacity)
+ return &CPURateLimiter{impl: impl}
+}
+
+func (e CPURateLimiter) Request() {
+ e.impl.Request(1)
+}
+
+func (e CPURateLimiter) Finish() {
+ e.impl.Finish(1)
+}
+
+func (e CPURateLimiter) Stop() {
+ e.impl.Stop()
+}
+
+// A MemoryRateLimiter limits the number of active calls based on Memory requirements
+type MemoryRateLimiter struct {
+ *RateLimit
+}
+
+func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
+ if capacity <= 0 {
+ capacity = 512 * 1024 * 1024 // 512MB
+ }
+ impl := NewRateLimit(capacity)
+ return &MemoryRateLimiter{RateLimit: impl}
+}
diff --git a/cmd/soong_zip/soong_zip.go b/cmd/soong_zip/soong_zip.go
index d634dda..cbec102 100644
--- a/cmd/soong_zip/soong_zip.go
+++ b/cmd/soong_zip/soong_zip.go
@@ -163,7 +163,8 @@
errors chan error
writeOps chan chan *zipEntry
- rateLimit *RateLimit
+ cpuRateLimiter *CPURateLimiter
+ memoryRateLimiter *MemoryRateLimiter
compressorPool sync.Pool
compLevel int
@@ -174,6 +175,10 @@
// List of delayed io.Reader
futureReaders chan chan io.Reader
+
+ // Only used for passing into the MemoryRateLimiter to ensure we
+ // release as much memory as much as we request
+ allocatedSize int64
}
func main() {
@@ -295,9 +300,12 @@
// The RateLimit object will put the upper bounds on the number of
// parallel compressions and outstanding buffers.
z.writeOps = make(chan chan *zipEntry, 1000)
- z.rateLimit = NewRateLimit(*parallelJobs, 0)
- defer z.rateLimit.Stop()
-
+ z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
+ z.memoryRateLimiter = NewMemoryRateLimiter(0)
+ defer func() {
+ z.cpuRateLimiter.Stop()
+ z.memoryRateLimiter.Stop()
+ }()
go func() {
var err error
defer close(z.writeOps)
@@ -369,6 +377,7 @@
currentWriter.Close()
currentWriter = nil
}
+ z.memoryRateLimiter.Finish(op.allocatedSize)
case futureReader, ok := <-readersChan:
if !ok {
@@ -381,12 +390,10 @@
currentReader = futureReader
case reader := <-currentReader:
- var count int64
- count, err = io.Copy(currentWriter, reader)
+ _, err = io.Copy(currentWriter, reader)
if err != nil {
return err
}
- z.rateLimit.Release(int(count))
currentReader = nil
@@ -456,7 +463,9 @@
return err
}
- exec := z.rateLimit.RequestExecution()
+ ze.allocatedSize = fileSize
+ z.cpuRateLimiter.Request()
+ z.memoryRateLimiter.Request(ze.allocatedSize)
if method == zip.Deflate && fileSize >= minParallelFileSize {
wg := new(sync.WaitGroup)
@@ -473,14 +482,14 @@
// know the result before we can begin writing the compressed
// data out to the zipfile.
wg.Add(1)
- go z.crcFile(r, ze, exec, compressChan, wg)
+ go z.crcFile(r, ze, compressChan, wg)
for start := int64(0); start < fileSize; start += parallelBlockSize {
sr := io.NewSectionReader(r, start, parallelBlockSize)
resultChan := make(chan io.Reader, 1)
ze.futureReaders <- resultChan
- exec := z.rateLimit.RequestExecution()
+ z.cpuRateLimiter.Request()
last := !(start+parallelBlockSize < fileSize)
var dict []byte
@@ -489,7 +498,7 @@
}
wg.Add(1)
- go z.compressPartialFile(sr, dict, last, exec, resultChan, wg)
+ go z.compressPartialFile(sr, dict, last, resultChan, wg)
}
close(ze.futureReaders)
@@ -500,15 +509,15 @@
f.Close()
}(wg, r)
} else {
- go z.compressWholeFile(ze, r, exec, compressChan)
+ go z.compressWholeFile(ze, r, compressChan)
}
return nil
}
-func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) {
+func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
defer wg.Done()
- defer exec.Finish(0)
+ defer z.cpuRateLimiter.Finish()
crc := crc32.NewIEEE()
_, err := io.Copy(crc, r)
@@ -522,7 +531,7 @@
close(resultChan)
}
-func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) {
+func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
defer wg.Done()
result, err := z.compressBlock(r, dict, last)
@@ -531,7 +540,8 @@
return
}
- exec.Finish(result.Len())
+ z.cpuRateLimiter.Finish()
+
resultChan <- result
}
@@ -569,9 +579,7 @@
return buf, nil
}
-func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) {
- var bufSize int
-
+func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
defer r.Close()
crc := crc32.NewIEEE()
@@ -616,7 +624,6 @@
}
if uint64(compressed.Len()) < ze.fh.UncompressedSize64 {
futureReader <- compressed
- bufSize = compressed.Len()
} else {
buf, err := readFile(r)
if err != nil {
@@ -625,7 +632,6 @@
}
ze.fh.Method = zip.Store
futureReader <- bytes.NewReader(buf)
- bufSize = int(ze.fh.UncompressedSize64)
}
} else {
buf, err := readFile(r)
@@ -635,10 +641,10 @@
}
ze.fh.Method = zip.Store
futureReader <- bytes.NewReader(buf)
- bufSize = int(ze.fh.UncompressedSize64)
}
- exec.Finish(bufSize)
+ z.cpuRateLimiter.Finish()
+
close(futureReader)
compressChan <- ze
@@ -706,11 +712,6 @@
futureReader <- bytes.NewBufferString(dest)
close(futureReader)
- // We didn't ask permission to execute, since this should be very short
- // but we still need to increment the outstanding buffer sizes, since
- // the read will decrement the buffer size.
- z.rateLimit.Release(-len(dest))
-
ze <- &zipEntry{
fh: fileHeader,
futureReaders: futureReaders,