blob: 9e95bc14b597308b02685f745e232f3af67c466f [file] [log] [blame]
Dan Willemsen017d8932016-08-04 15:43:03 -07001// Copyright 2016 Google Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "runtime"
19)
20
21type RateLimit struct {
Colin Cross526416b2017-08-17 23:54:51 +000022 requests chan struct{}
23 finished chan int
24 released chan int
25 stop chan struct{}
Dan Willemsen017d8932016-08-04 15:43:03 -070026}
27
Colin Cross526416b2017-08-17 23:54:51 +000028// NewRateLimit starts a new rate limiter with maxExecs number of executions
29// allowed to happen at a time. If maxExecs is <= 0, it will default to the
30// number of logical CPUs on the system.
31//
32// With Finish and Release, we'll keep track of outstanding buffer sizes to be
33// written. If that size goes above maxMem, we'll prevent starting new
34// executions.
35//
36// The total memory use may be higher due to current executions. This just
37// prevents runaway memory use due to slower writes.
38func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
39 if maxExecs <= 0 {
40 maxExecs = runtime.NumCPU()
41 }
42 if maxMem <= 0 {
43 // Default to 512MB
44 maxMem = 512 * 1024 * 1024
Dan Willemsen017d8932016-08-04 15:43:03 -070045 }
46
Colin Cross526416b2017-08-17 23:54:51 +000047 ret := &RateLimit{
48 requests: make(chan struct{}),
49
50 // Let all of the pending executions to mark themselves as finished,
51 // even if our goroutine isn't processing input.
52 finished: make(chan int, maxExecs),
53
54 released: make(chan int),
55 stop: make(chan struct{}),
56 }
57
58 go ret.goFunc(maxExecs, maxMem)
Dan Willemsen017d8932016-08-04 15:43:03 -070059
60 return ret
61}
62
Colin Cross526416b2017-08-17 23:54:51 +000063// RequestExecution blocks until another execution can be allowed to run.
64func (r *RateLimit) RequestExecution() Execution {
65 <-r.requests
66 return r.finished
Dan Willemsen017d8932016-08-04 15:43:03 -070067}
68
Colin Cross526416b2017-08-17 23:54:51 +000069type Execution chan<- int
70
71// Finish will mark your execution as finished, and allow another request to be
72// approved.
73//
74// bufferSize may be specified to count memory buffer sizes, and must be
75// matched with calls to RateLimit.Release to mark the buffers as released.
76func (e Execution) Finish(bufferSize int) {
77 e <- bufferSize
78}
79
80// Call Release when finished with a buffer recorded with Finish.
81func (r *RateLimit) Release(bufferSize int) {
82 r.released <- bufferSize
Dan Willemsen017d8932016-08-04 15:43:03 -070083}
84
85// Stop the background goroutine
86func (r *RateLimit) Stop() {
87 close(r.stop)
88}
89
Colin Cross526416b2017-08-17 23:54:51 +000090func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
91 var curExecs int
92 var curMemory int64
Dan Willemsen017d8932016-08-04 15:43:03 -070093
94 for {
Colin Cross526416b2017-08-17 23:54:51 +000095 var requests chan struct{}
96 if curExecs < maxExecs && curMemory < maxMem {
Dan Willemsen017d8932016-08-04 15:43:03 -070097 requests = r.requests
98 }
99
100 select {
Colin Cross526416b2017-08-17 23:54:51 +0000101 case requests <- struct{}{}:
102 curExecs++
103 case amount := <-r.finished:
104 curExecs--
105 curMemory += int64(amount)
106 if curExecs < 0 {
107 panic("curExecs < 0")
Dan Willemsen017d8932016-08-04 15:43:03 -0700108 }
Colin Cross526416b2017-08-17 23:54:51 +0000109 case amount := <-r.released:
110 curMemory -= int64(amount)
Dan Willemsen017d8932016-08-04 15:43:03 -0700111 case <-r.stop:
112 return
113 }
114 }
115}