|  | // Copyright 2016 Google Inc. All rights reserved. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | package zip | 
|  |  | 
|  | import ( | 
|  | "fmt" | 
|  | "runtime" | 
|  | ) | 
|  |  | 
|  | type RateLimit struct { | 
|  | requests    chan request | 
|  | completions chan int64 | 
|  |  | 
|  | stop chan struct{} | 
|  | } | 
|  |  | 
|  | type request struct { | 
|  | size     int64 | 
|  | serviced chan struct{} | 
|  | } | 
|  |  | 
|  | // NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once, | 
|  | // except when no capacity is in use, in which case the first caller is always permitted | 
|  | func NewRateLimit(capacity int64) *RateLimit { | 
|  | ret := &RateLimit{ | 
|  | requests:    make(chan request), | 
|  | completions: make(chan int64), | 
|  |  | 
|  | stop: make(chan struct{}), | 
|  | } | 
|  |  | 
|  | go ret.monitorChannels(capacity) | 
|  |  | 
|  | return ret | 
|  | } | 
|  |  | 
|  | // RequestExecution blocks until another execution of size <size> can be allowed to run. | 
|  | func (r *RateLimit) Request(size int64) { | 
|  | request := request{ | 
|  | size:     size, | 
|  | serviced: make(chan struct{}, 1), | 
|  | } | 
|  |  | 
|  | // wait for the request to be received | 
|  | r.requests <- request | 
|  |  | 
|  | // wait for the request to be accepted | 
|  | <-request.serviced | 
|  | } | 
|  |  | 
|  | // Finish declares the completion of an execution of size <size> | 
|  | func (r *RateLimit) Finish(size int64) { | 
|  | r.completions <- size | 
|  | } | 
|  |  | 
|  | // Stop the background goroutine | 
|  | func (r *RateLimit) Stop() { | 
|  | close(r.stop) | 
|  | } | 
|  |  | 
|  | // monitorChannels processes incoming requests from channels | 
|  | func (r *RateLimit) monitorChannels(capacity int64) { | 
|  | var usedCapacity int64 | 
|  | var currentRequest *request | 
|  |  | 
|  | for { | 
|  | var requests chan request | 
|  | if currentRequest == nil { | 
|  | // If we don't already have a queued request, then we should check for a new request | 
|  | requests = r.requests | 
|  | } | 
|  |  | 
|  | select { | 
|  | case newRequest := <-requests: | 
|  | currentRequest = &newRequest | 
|  | case amountCompleted := <-r.completions: | 
|  | usedCapacity -= amountCompleted | 
|  |  | 
|  | if usedCapacity < 0 { | 
|  | panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted)) | 
|  | } | 
|  | case <-r.stop: | 
|  | return | 
|  | } | 
|  |  | 
|  | if currentRequest != nil { | 
|  | accepted := false | 
|  | if usedCapacity == 0 { | 
|  | accepted = true | 
|  | } else { | 
|  | if capacity >= usedCapacity+currentRequest.size { | 
|  | accepted = true | 
|  | } | 
|  | } | 
|  | if accepted { | 
|  | usedCapacity += currentRequest.size | 
|  | currentRequest.serviced <- struct{}{} | 
|  | currentRequest = nil | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // A CPURateLimiter limits the number of active calls based on CPU requirements | 
|  | type CPURateLimiter struct { | 
|  | impl *RateLimit | 
|  | } | 
|  |  | 
|  | func NewCPURateLimiter(capacity int64) *CPURateLimiter { | 
|  | if capacity <= 0 { | 
|  | capacity = int64(runtime.NumCPU()) | 
|  | } | 
|  | impl := NewRateLimit(capacity) | 
|  | return &CPURateLimiter{impl: impl} | 
|  | } | 
|  |  | 
|  | func (e CPURateLimiter) Request() { | 
|  | e.impl.Request(1) | 
|  | } | 
|  |  | 
|  | func (e CPURateLimiter) Finish() { | 
|  | e.impl.Finish(1) | 
|  | } | 
|  |  | 
|  | func (e CPURateLimiter) Stop() { | 
|  | e.impl.Stop() | 
|  | } | 
|  |  | 
|  | // A MemoryRateLimiter limits the number of active calls based on Memory requirements | 
|  | type MemoryRateLimiter struct { | 
|  | *RateLimit | 
|  | } | 
|  |  | 
|  | func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter { | 
|  | if capacity <= 0 { | 
|  | capacity = 512 * 1024 * 1024 // 512MB | 
|  | } | 
|  | impl := NewRateLimit(capacity) | 
|  | return &MemoryRateLimiter{RateLimit: impl} | 
|  | } |