forked from cludden/benthos
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
305 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
// Copyright (c) 2018 Ashley Jeffs | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
// Package ratelimit implements a rate limiter to be shared across components | ||
// hitting the same protected resource. | ||
package ratelimit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
// Copyright (c) 2018 Ashley Jeffs | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package ratelimit | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
"time" | ||
) | ||
|
||
//------------------------------------------------------------------------------ | ||
|
||
// Config is a config struct containing rate limit fields. | ||
type Config struct { | ||
Count int `json:"count" yaml:"count"` | ||
Interval string `json:"interval" yaml:"interval"` | ||
} | ||
|
||
// NewConfig returns a rate limit configuration struct with default values. | ||
func NewConfig() Config { | ||
return Config{ | ||
Count: 1000, | ||
Interval: "1s", | ||
} | ||
} | ||
|
||
//------------------------------------------------------------------------------ | ||
|
||
// Type is a structure that tracks a rate limit, it can be shared across | ||
// parallel processes in order to maintain a maximum rate of a protected | ||
// resource. | ||
type Type struct { | ||
mut sync.Mutex | ||
bucket int | ||
lastRefresh time.Time | ||
|
||
size int | ||
period time.Duration | ||
} | ||
|
||
// New creates a rate limit from a configuration struct. This type is safe to | ||
// share and call from parallel goroutines. | ||
func New(conf Config) (*Type, error) { | ||
if conf.Count <= 0 { | ||
return nil, errors.New("count must be larger than zero") | ||
} | ||
period, err := time.ParseDuration(conf.Interval) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse interval: %v", err) | ||
} | ||
return &Type{ | ||
bucket: conf.Count, | ||
lastRefresh: time.Now(), | ||
size: conf.Count, | ||
period: period, | ||
}, nil | ||
} | ||
|
||
//------------------------------------------------------------------------------ | ||
|
||
// Get access to the rate limited resource. Returns a bool indicating whether | ||
// access is permitted and, if false, an appropriate time period to wait before | ||
// requesting again. | ||
func (r *Type) Get() (bool, time.Duration) { | ||
r.mut.Lock() | ||
r.bucket-- | ||
|
||
if r.bucket < 0 { | ||
r.bucket = 0 | ||
remaining := r.period - time.Since(r.lastRefresh) | ||
|
||
if remaining > 0 { | ||
r.mut.Unlock() | ||
return false, remaining | ||
} | ||
r.bucket = r.size - 1 | ||
r.lastRefresh = time.Now() | ||
} | ||
r.mut.Unlock() | ||
return true, 0 | ||
} | ||
|
||
//------------------------------------------------------------------------------ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
// Copyright (c) 2018 Ashley Jeffs | ||
// | ||
// Permission is hereby granted, free of charge, to any person obtaining a copy | ||
// of this software and associated documentation files (the "Software"), to deal | ||
// in the Software without restriction, including without limitation the rights | ||
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
// copies of the Software, and to permit persons to whom the Software is | ||
// furnished to do so, subject to the following conditions: | ||
// | ||
// The above copyright notice and this permission notice shall be included in | ||
// all copies or substantial portions of the Software. | ||
// | ||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package ratelimit | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
"time" | ||
) | ||
|
||
//------------------------------------------------------------------------------ | ||
|
||
func TestRateLimitConfErrors(t *testing.T) { | ||
conf := NewConfig() | ||
conf.Count = -1 | ||
if _, err := New(conf); err == nil { | ||
t.Error("expected error from bad count") | ||
} | ||
|
||
conf = NewConfig() | ||
conf.Interval = "nope" | ||
if _, err := New(conf); err == nil { | ||
t.Error("expected error from bad interval") | ||
} | ||
} | ||
|
||
func TestRateLimitBasic(t *testing.T) { | ||
conf := NewConfig() | ||
conf.Count = 10 | ||
conf.Interval = "1s" | ||
|
||
rl, err := New(conf) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
for i := 0; i < conf.Count; i++ { | ||
got, period := rl.Get() | ||
if !got { | ||
t.Errorf("Rate limited on get %v", i) | ||
} | ||
if period > 0 { | ||
t.Errorf("Period above zero: %v", period) | ||
} | ||
} | ||
|
||
if got, period := rl.Get(); got { | ||
t.Error("Expected limit on final request") | ||
} else { | ||
if period > time.Second { | ||
t.Errorf("Period beyond interval: %v", period) | ||
} | ||
if period <= 0 { | ||
t.Errorf("Period less than or equal to zero: %v", period) | ||
} | ||
} | ||
} | ||
|
||
func TestRateLimitRefresh(t *testing.T) { | ||
conf := NewConfig() | ||
conf.Count = 10 | ||
conf.Interval = "10ms" | ||
|
||
rl, err := New(conf) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
for i := 0; i < conf.Count; i++ { | ||
got, period := rl.Get() | ||
if !got { | ||
t.Errorf("Rate limited on get %v", i) | ||
} | ||
if period > 0 { | ||
t.Errorf("Period above zero: %v", period) | ||
} | ||
} | ||
|
||
if got, period := rl.Get(); got { | ||
t.Error("Expected limit on final request") | ||
} else { | ||
if period > time.Second { | ||
t.Errorf("Period beyond interval: %v", period) | ||
} | ||
if period <= 0 { | ||
t.Errorf("Period less than or equal to zero: %v", period) | ||
} | ||
} | ||
|
||
<-time.After(time.Millisecond * 15) | ||
|
||
for i := 0; i < conf.Count; i++ { | ||
got, period := rl.Get() | ||
if !got { | ||
t.Errorf("Rate limited on get %v", i) | ||
} | ||
if period > 0 { | ||
t.Errorf("Period above zero: %v", period) | ||
} | ||
} | ||
|
||
if got, period := rl.Get(); got { | ||
t.Error("Expected limit on final request") | ||
} else { | ||
if period > time.Second { | ||
t.Errorf("Period beyond interval: %v", period) | ||
} | ||
if period <= 0 { | ||
t.Errorf("Period less than or equal to zero: %v", period) | ||
} | ||
} | ||
} | ||
|
||
//------------------------------------------------------------------------------ | ||
|
||
func BenchmarkRateLimit(b *testing.B) { | ||
/* A rate limit is typically going to be protecting a networked resource | ||
* where the request will likely be measured at least in hundreds of | ||
* microseconds. It would be reasonable to assume the rate limit might be | ||
* shared across tens of components. | ||
* | ||
* Therefore, we can probably sit comfortably with lock contention across | ||
* one hundred or so parallel components adding an overhead of single digit | ||
* microseconds. Since this benchmark doesn't take into account the actual | ||
* request duration after receiving a rate limit I've set the number of | ||
* components to ten in order to compensate. | ||
*/ | ||
b.ReportAllocs() | ||
|
||
nParallel := 10 | ||
startChan := make(chan struct{}) | ||
wg := sync.WaitGroup{} | ||
wg.Add(nParallel) | ||
|
||
conf := NewConfig() | ||
conf.Count = 1000 | ||
conf.Interval = "1ns" | ||
|
||
rl, err := New(conf) | ||
if err != nil { | ||
b.Fatal(err) | ||
} | ||
|
||
for i := 0; i < nParallel; i++ { | ||
go func() { | ||
<-startChan | ||
for j := 0; j < b.N; j++ { | ||
ok, period := rl.Get() | ||
if !ok { | ||
time.Sleep(period) | ||
} | ||
} | ||
wg.Done() | ||
}() | ||
} | ||
|
||
b.ResetTimer() | ||
close(startChan) | ||
wg.Wait() | ||
} | ||
|
||
//------------------------------------------------------------------------------ |