Revert "Revert "Refactor rate_limit.go for more clarify""

This reverts commit 526416b1e49931b13948d1ccff8d50f2c1b4d178.

Figured out a fix for the deadlocks; resubmitting the patch.
The first version was deadlocking because the switch statement in
zipWriter.write would choose a specific zip entry to start writing,
but the individual chunks may not have all necessarily been compressed
yet. When each individual chunk was made to require to request its own
allocations, the compression of the chunks of the file being currently
written could be blocked waiting for memory to be freed by chunks from
other files that hadn't yet started being written.

This patch is much like the original except it preallocates the memory
for the entire file upfront (and happens to use the total file size
rather than the compressed size, but I didn't observe that to cause any
performance differences).

Bug: 64536066
Test: m -j dist showcommands # which runs soong_zip to package everything

Change-Id: Id1d7ff415e54d3a6be71188abbdbbbab5a719fcf
diff --git a/cmd/soong_zip/soong_zip.go b/cmd/soong_zip/soong_zip.go
index d634dda..cbec102 100644
--- a/cmd/soong_zip/soong_zip.go
+++ b/cmd/soong_zip/soong_zip.go
@@ -163,7 +163,8 @@
 	errors   chan error
 	writeOps chan chan *zipEntry
 
-	rateLimit *RateLimit
+	cpuRateLimiter    *CPURateLimiter
+	memoryRateLimiter *MemoryRateLimiter
 
 	compressorPool sync.Pool
 	compLevel      int
@@ -174,6 +175,10 @@
 
 	// List of delayed io.Reader
 	futureReaders chan chan io.Reader
+
+	// Only used for passing into the MemoryRateLimiter to ensure we
+	// release as much memory as much as we request
+	allocatedSize int64
 }
 
 func main() {
@@ -295,9 +300,12 @@
 	// The RateLimit object will put the upper bounds on the number of
 	// parallel compressions and outstanding buffers.
 	z.writeOps = make(chan chan *zipEntry, 1000)
-	z.rateLimit = NewRateLimit(*parallelJobs, 0)
-	defer z.rateLimit.Stop()
-
+	z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
+	z.memoryRateLimiter = NewMemoryRateLimiter(0)
+	defer func() {
+		z.cpuRateLimiter.Stop()
+		z.memoryRateLimiter.Stop()
+	}()
 	go func() {
 		var err error
 		defer close(z.writeOps)
@@ -369,6 +377,7 @@
 				currentWriter.Close()
 				currentWriter = nil
 			}
+			z.memoryRateLimiter.Finish(op.allocatedSize)
 
 		case futureReader, ok := <-readersChan:
 			if !ok {
@@ -381,12 +390,10 @@
 			currentReader = futureReader
 
 		case reader := <-currentReader:
-			var count int64
-			count, err = io.Copy(currentWriter, reader)
+			_, err = io.Copy(currentWriter, reader)
 			if err != nil {
 				return err
 			}
-			z.rateLimit.Release(int(count))
 
 			currentReader = nil
 
@@ -456,7 +463,9 @@
 		return err
 	}
 
-	exec := z.rateLimit.RequestExecution()
+	ze.allocatedSize = fileSize
+	z.cpuRateLimiter.Request()
+	z.memoryRateLimiter.Request(ze.allocatedSize)
 
 	if method == zip.Deflate && fileSize >= minParallelFileSize {
 		wg := new(sync.WaitGroup)
@@ -473,14 +482,14 @@
 		// know the result before we can begin writing the compressed
 		// data out to the zipfile.
 		wg.Add(1)
-		go z.crcFile(r, ze, exec, compressChan, wg)
+		go z.crcFile(r, ze, compressChan, wg)
 
 		for start := int64(0); start < fileSize; start += parallelBlockSize {
 			sr := io.NewSectionReader(r, start, parallelBlockSize)
 			resultChan := make(chan io.Reader, 1)
 			ze.futureReaders <- resultChan
 
-			exec := z.rateLimit.RequestExecution()
+			z.cpuRateLimiter.Request()
 
 			last := !(start+parallelBlockSize < fileSize)
 			var dict []byte
@@ -489,7 +498,7 @@
 			}
 
 			wg.Add(1)
-			go z.compressPartialFile(sr, dict, last, exec, resultChan, wg)
+			go z.compressPartialFile(sr, dict, last, resultChan, wg)
 		}
 
 		close(ze.futureReaders)
@@ -500,15 +509,15 @@
 			f.Close()
 		}(wg, r)
 	} else {
-		go z.compressWholeFile(ze, r, exec, compressChan)
+		go z.compressWholeFile(ze, r, compressChan)
 	}
 
 	return nil
 }
 
-func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) {
+func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
 	defer wg.Done()
-	defer exec.Finish(0)
+	defer z.cpuRateLimiter.Finish()
 
 	crc := crc32.NewIEEE()
 	_, err := io.Copy(crc, r)
@@ -522,7 +531,7 @@
 	close(resultChan)
 }
 
-func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) {
+func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
 	defer wg.Done()
 
 	result, err := z.compressBlock(r, dict, last)
@@ -531,7 +540,8 @@
 		return
 	}
 
-	exec.Finish(result.Len())
+	z.cpuRateLimiter.Finish()
+
 	resultChan <- result
 }
 
@@ -569,9 +579,7 @@
 	return buf, nil
 }
 
-func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) {
-	var bufSize int
-
+func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
 	defer r.Close()
 
 	crc := crc32.NewIEEE()
@@ -616,7 +624,6 @@
 		}
 		if uint64(compressed.Len()) < ze.fh.UncompressedSize64 {
 			futureReader <- compressed
-			bufSize = compressed.Len()
 		} else {
 			buf, err := readFile(r)
 			if err != nil {
@@ -625,7 +632,6 @@
 			}
 			ze.fh.Method = zip.Store
 			futureReader <- bytes.NewReader(buf)
-			bufSize = int(ze.fh.UncompressedSize64)
 		}
 	} else {
 		buf, err := readFile(r)
@@ -635,10 +641,10 @@
 		}
 		ze.fh.Method = zip.Store
 		futureReader <- bytes.NewReader(buf)
-		bufSize = int(ze.fh.UncompressedSize64)
 	}
 
-	exec.Finish(bufSize)
+	z.cpuRateLimiter.Finish()
+
 	close(futureReader)
 
 	compressChan <- ze
@@ -706,11 +712,6 @@
 	futureReader <- bytes.NewBufferString(dest)
 	close(futureReader)
 
-	// We didn't ask permission to execute, since this should be very short
-	// but we still need to increment the outstanding buffer sizes, since
-	// the read will decrement the buffer size.
-	z.rateLimit.Release(-len(dest))
-
 	ze <- &zipEntry{
 		fh:            fileHeader,
 		futureReaders: futureReaders,