Refactor rate_limit.go for more clarify
This wasn't intended to change the behavior, but it does slightly.
Previously any requests to acquire memory wouldn't block; only
(subsequent) requests for execution would block (if another
caller had already consumed the memory quota). Now the requests
for memory can also also block.
It turns out in a brief test on my computer that soong_zip
runs about 10 to 20% faster with this change than without it.
The final step involving soong_zip decreased from about
3.6 sec to about 3.3 sec in tests on my computer.
When testing the process of re-zipping the contents of
angler-target_files*.zip , the time decreased from about
6.3 sec to about 5.3 sec in tests on my computer, and the
peak memory usage reported by `top` decreased from about
1.5g to 1g
Bug: 64536066
Test: m -j dist showcommands # which runs soong_zip to package everything
Change-Id: I0422e4c363c675bb7a93309fac4616c768bfbe8f
diff --git a/cmd/soong_zip/soong_zip.go b/cmd/soong_zip/soong_zip.go
index d634dda..3d47f2c 100644
--- a/cmd/soong_zip/soong_zip.go
+++ b/cmd/soong_zip/soong_zip.go
@@ -163,7 +163,8 @@
errors chan error
writeOps chan chan *zipEntry
- rateLimit *RateLimit
+ cpuRateLimiter *CPURateLimiter
+ memoryRateLimiter *MemoryRateLimiter
compressorPool sync.Pool
compLevel int
@@ -295,8 +296,12 @@
// The RateLimit object will put the upper bounds on the number of
// parallel compressions and outstanding buffers.
z.writeOps = make(chan chan *zipEntry, 1000)
- z.rateLimit = NewRateLimit(*parallelJobs, 0)
- defer z.rateLimit.Stop()
+ z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
+ z.memoryRateLimiter = NewMemoryRateLimiter(0)
+ defer func() {
+ z.cpuRateLimiter.Stop()
+ z.memoryRateLimiter.Stop()
+ }()
go func() {
var err error
@@ -386,7 +391,7 @@
if err != nil {
return err
}
- z.rateLimit.Release(int(count))
+ z.memoryRateLimiter.Finish(count)
currentReader = nil
@@ -456,7 +461,7 @@
return err
}
- exec := z.rateLimit.RequestExecution()
+ z.cpuRateLimiter.Request()
if method == zip.Deflate && fileSize >= minParallelFileSize {
wg := new(sync.WaitGroup)
@@ -473,14 +478,14 @@
// know the result before we can begin writing the compressed
// data out to the zipfile.
wg.Add(1)
- go z.crcFile(r, ze, exec, compressChan, wg)
+ go z.crcFile(r, ze, compressChan, wg)
for start := int64(0); start < fileSize; start += parallelBlockSize {
sr := io.NewSectionReader(r, start, parallelBlockSize)
resultChan := make(chan io.Reader, 1)
ze.futureReaders <- resultChan
- exec := z.rateLimit.RequestExecution()
+ z.cpuRateLimiter.Request()
last := !(start+parallelBlockSize < fileSize)
var dict []byte
@@ -489,7 +494,7 @@
}
wg.Add(1)
- go z.compressPartialFile(sr, dict, last, exec, resultChan, wg)
+ go z.compressPartialFile(sr, dict, last, resultChan, wg)
}
close(ze.futureReaders)
@@ -500,15 +505,15 @@
f.Close()
}(wg, r)
} else {
- go z.compressWholeFile(ze, r, exec, compressChan)
+ go z.compressWholeFile(ze, r, compressChan)
}
return nil
}
-func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) {
+func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
defer wg.Done()
- defer exec.Finish(0)
+ defer z.cpuRateLimiter.Finish()
crc := crc32.NewIEEE()
_, err := io.Copy(crc, r)
@@ -522,7 +527,7 @@
close(resultChan)
}
-func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) {
+func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
defer wg.Done()
result, err := z.compressBlock(r, dict, last)
@@ -531,7 +536,9 @@
return
}
- exec.Finish(result.Len())
+ z.memoryRateLimiter.Request(int64(result.Len()))
+ z.cpuRateLimiter.Finish()
+
resultChan <- result
}
@@ -569,7 +576,7 @@
return buf, nil
}
-func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) {
+func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
var bufSize int
defer r.Close()
@@ -638,7 +645,9 @@
bufSize = int(ze.fh.UncompressedSize64)
}
- exec.Finish(bufSize)
+ z.memoryRateLimiter.Request(int64(bufSize))
+ z.cpuRateLimiter.Finish()
+
close(futureReader)
compressChan <- ze
@@ -709,7 +718,7 @@
// We didn't ask permission to execute, since this should be very short
// but we still need to increment the outstanding buffer sizes, since
// the read will decrement the buffer size.
- z.rateLimit.Release(-len(dest))
+ z.memoryRateLimiter.Finish(int64(-len(dest)))
ze <- &zipEntry{
fh: fileHeader,