split soong_zip into a library and a binary
to make it faster/easier to invoke from other Go programs
(such as multiproduct_kati)
Bug: 67478260
Test: m -j
Change-Id: Idd2671a44290550197c88f53dd11a6dd39c85cc5
diff --git a/zip/rate_limit.go b/zip/rate_limit.go
new file mode 100644
index 0000000..0ea2ef0
--- /dev/null
+++ b/zip/rate_limit.go
@@ -0,0 +1,152 @@
+// Copyright 2016 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package zip
+
+import (
+ "fmt"
+ "runtime"
+)
+
+type RateLimit struct {
+ requests chan request
+ completions chan int64
+
+ stop chan struct{}
+}
+
+type request struct {
+ size int64
+ serviced chan struct{}
+}
+
+// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
+// except when no capacity is in use, in which case the first caller is always permitted
+func NewRateLimit(capacity int64) *RateLimit {
+ ret := &RateLimit{
+ requests: make(chan request),
+ completions: make(chan int64),
+
+ stop: make(chan struct{}),
+ }
+
+ go ret.monitorChannels(capacity)
+
+ return ret
+}
+
+// RequestExecution blocks until another execution of size <size> can be allowed to run.
+func (r *RateLimit) Request(size int64) {
+ request := request{
+ size: size,
+ serviced: make(chan struct{}, 1),
+ }
+
+ // wait for the request to be received
+ r.requests <- request
+
+ // wait for the request to be accepted
+ <-request.serviced
+}
+
+// Finish declares the completion of an execution of size <size>
+func (r *RateLimit) Finish(size int64) {
+ r.completions <- size
+}
+
+// Stop the background goroutine
+func (r *RateLimit) Stop() {
+ close(r.stop)
+}
+
+// monitorChannels processes incoming requests from channels
+func (r *RateLimit) monitorChannels(capacity int64) {
+ var usedCapacity int64
+ var currentRequest *request
+
+ for {
+ var requests chan request
+ if currentRequest == nil {
+ // If we don't already have a queued request, then we should check for a new request
+ requests = r.requests
+ }
+
+ select {
+ case newRequest := <-requests:
+ currentRequest = &newRequest
+ case amountCompleted := <-r.completions:
+ usedCapacity -= amountCompleted
+
+ if usedCapacity < 0 {
+ panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
+ }
+ case <-r.stop:
+ return
+ }
+
+ if currentRequest != nil {
+ accepted := false
+ if usedCapacity == 0 {
+ accepted = true
+ } else {
+ if capacity >= usedCapacity+currentRequest.size {
+ accepted = true
+ }
+ }
+ if accepted {
+ usedCapacity += currentRequest.size
+ currentRequest.serviced <- struct{}{}
+ currentRequest = nil
+ }
+ }
+ }
+}
+
+// A CPURateLimiter limits the number of active calls based on CPU requirements
+type CPURateLimiter struct {
+ impl *RateLimit
+}
+
+func NewCPURateLimiter(capacity int64) *CPURateLimiter {
+ if capacity <= 0 {
+ capacity = int64(runtime.NumCPU())
+ }
+ impl := NewRateLimit(capacity)
+ return &CPURateLimiter{impl: impl}
+}
+
+func (e CPURateLimiter) Request() {
+ e.impl.Request(1)
+}
+
+func (e CPURateLimiter) Finish() {
+ e.impl.Finish(1)
+}
+
+func (e CPURateLimiter) Stop() {
+ e.impl.Stop()
+}
+
+// A MemoryRateLimiter limits the number of active calls based on Memory requirements
+type MemoryRateLimiter struct {
+ *RateLimit
+}
+
+func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
+ if capacity <= 0 {
+ capacity = 512 * 1024 * 1024 // 512MB
+ }
+ impl := NewRateLimit(capacity)
+ return &MemoryRateLimiter{RateLimit: impl}
+}