Skip to content

Commit

Permalink
feat: adding ExampleNewThrottleBy and ExampleNewThrottleByWithCount
Browse files Browse the repository at this point in the history
  • Loading branch information
samber committed Jan 26, 2025
1 parent fdd8865 commit 5cd3266
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 18 deletions.
28 changes: 23 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# lo - Iterate over slices, maps, channels...

[![tag](https://img.shields.io/github/tag/samber/lo.svg)](https://github.com/samber/lo/releases)
Expand Down Expand Up @@ -3419,11 +3420,12 @@ cancel("second key")
[[play](https://go.dev/play/p/d3Vpt6pxhY8)]

### Throttle
`NewThrottle` creates a throttled instance that invokes given functions only once in every interval.

Creates a throttled instance that invokes given functions only once in every interval.

This returns 2 functions, First one is throttled function and Second one is a function to reset interval.

```go

f := func() {
println("Called once in every 100ms")
}
Expand All @@ -3437,17 +3439,16 @@ for j := 0; j < 10; j++ {

reset()
throttle()

```

`NewThrottleWithCount` is NewThrottle with count limit, throttled function will be invoked count times in every interval.
```go

```go
f := func() {
println("Called three times in every 100ms")
}

throttle, reset := lo.NewThrottle(100 * time.Millisecond, f)
throttle, reset := lo.NewThrottleWithCount(100 * time.Millisecond, f)

for j := 0; j < 10; j++ {
throttle()
Expand All @@ -3456,7 +3457,24 @@ for j := 0; j < 10; j++ {

reset()
throttle()
```

`NewThrottleBy` and `NewThrottleByWithCount` are NewThrottle with sharding key, throttled function will be invoked count times in every interval.

```go
f := func(key string) {
println(key, "Called three times in every 100ms")
}

throttle, reset := lo.NewThrottleByWithCount(100 * time.Millisecond, f)

for j := 0; j < 10; j++ {
throttle("foo")
time.Sleep(30 * time.Millisecond)
}

reset()
throttle()
```

### Synchronize
Expand Down
53 changes: 40 additions & 13 deletions retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,23 +287,29 @@ func (t *Transaction[T]) Process(state T) (T, error) {
return state, err
}

type throttle struct {
// @TODO: single mutex per key ?
type throttleBy[T comparable] struct {
mu *sync.Mutex
timer *time.Timer
interval time.Duration
callbacks []func()
callbacks []func(key T)
countLimit int
count int
count map[T]int
}

func (th *throttle) throttledFunc() {
func (th *throttleBy[T]) throttledFunc(key T) {
th.mu.Lock()
defer th.mu.Unlock()
if th.count < th.countLimit {
th.count++

if _, ok := th.count[key]; !ok {
th.count[key] = 0
}

if th.count[key] < th.countLimit {
th.count[key]++

for _, f := range th.callbacks {
f()
f(key)
}

}
Expand All @@ -314,35 +320,56 @@ func (th *throttle) throttledFunc() {
}
}

func (th *throttle) reset() {
func (th *throttleBy[T]) reset() {
th.mu.Lock()
defer th.mu.Unlock()

if th.timer != nil {
th.timer.Stop()
}

th.count = 0
th.count = map[T]int{}
th.timer = nil

}

// NewThrottle creates a throttled instance that invokes given functions only once in every interval.
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
func NewThrottle(interval time.Duration, f ...func()) (func(), func()) {
func NewThrottle(interval time.Duration, f ...func()) (throttle func(), reset func()) {
return NewThrottleWithCount(interval, 1, f...)
}

// NewThrottleWithCount is NewThrottle with count limit, throttled function will be invoked count times in every interval.
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (func(), func()) {
func NewThrottleWithCount(interval time.Duration, count int, f ...func()) (throttle func(), reset func()) {
callbacks := Map(f, func(item func(), _ int) func(struct{}) {
return func(struct{}) {
item()
}
})

throttleFn, reset := NewThrottleByWithCount[struct{}](interval, count, callbacks...)
return func() {
throttleFn(struct{}{})
}, reset
}

// NewThrottleBy creates a throttled instance that invokes given functions only once in every interval.
// This returns 2 functions, First one is throttled function and Second one is a function to reset interval
func NewThrottleBy[T comparable](interval time.Duration, f ...func(key T)) (throttle func(key T), reset func()) {
return NewThrottleByWithCount[T](interval, 1, f...)
}

// NewThrottleByWithCount is NewThrottleBy with count limit, throttled function will be invoked count times in every interval.
func NewThrottleByWithCount[T comparable](interval time.Duration, count int, f ...func(key T)) (throttle func(key T), reset func()) {
if count <= 0 {
count = 1
}
th := &throttle{

th := &throttleBy[T]{
mu: new(sync.Mutex),
interval: interval,
callbacks: f,
countLimit: count,
count: map[T]int{},
}
return th.throttledFunc, th.reset
}
89 changes: 89 additions & 0 deletions retry_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,92 @@ func ExampleTransaction_error() {
// -5
// error
}

func ExampleNewThrottle() {
throttle, reset := NewThrottle(100*time.Millisecond, func() {
fmt.Println("Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
}

func ExampleNewThrottleWithCount() {
throttle, reset := NewThrottleWithCount(100*time.Millisecond, 2, func() {
fmt.Println("Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle()
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
// Called once in every 100ms
}

func ExampleNewThrottleBy() {
throttle, reset := NewThrottleBy(100*time.Millisecond, func(key string) {
fmt.Println(key, "Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle("foo")
throttle("bar")
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
}

func ExampleNewThrottleByWithCount() {
throttle, reset := NewThrottleByWithCount(100*time.Millisecond, 2, func(key string) {
fmt.Println(key, "Called once in every 100ms")
})

for j := 0; j < 10; j++ {
throttle("foo")
throttle("bar")
time.Sleep(30 * time.Millisecond)
}

reset()

// Output:
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
// foo Called once in every 100ms
// bar Called once in every 100ms
}
82 changes: 82 additions & 0 deletions retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,85 @@ func TestNewThrottleWithCount(t *testing.T) {

is.Equal(9, callCount)
}

func TestNewThrottleBy(t *testing.T) {
t.Parallel()
is := assert.New(t)
callCountA := 0
callCountB := 0
f1 := func(key string) {
if key == "a" {
callCountA++
} else {
callCountB++
}
}
th, reset := NewThrottleBy[string](10*time.Millisecond, f1)

is.Equal(0, callCountA)
is.Equal(0, callCountB)
for j := 0; j < 100; j++ {
th("a")
th("b")
}
is.Equal(1, callCountA)
is.Equal(1, callCountB)

time.Sleep(15 * time.Millisecond)

for j := 0; j < 100; j++ {
th("a")
th("b")
}

is.Equal(2, callCountA)
is.Equal(2, callCountB)

// reset counter
reset()
th("a")
is.Equal(3, callCountA)
is.Equal(2, callCountB)

}

func TestNewThrottleByWithCount(t *testing.T) {
t.Parallel()
is := assert.New(t)
callCountA := 0
callCountB := 0
f1 := func(key string) {
if key == "a" {
callCountA++
} else {
callCountB++
}
}
th, reset := NewThrottleByWithCount(10*time.Millisecond, 3, f1)

// the function does not throttle for initial count number
for i := 0; i < 20; i++ {
th("a")
th("b")
}
is.Equal(3, callCountA)
is.Equal(3, callCountB)

time.Sleep(11 * time.Millisecond)

for i := 0; i < 20; i++ {
th("a")
th("b")
}

is.Equal(6, callCountA)
is.Equal(6, callCountB)

reset()
for i := 0; i < 20; i++ {
th("a")
}

is.Equal(9, callCountA)
is.Equal(6, callCountB)
}

0 comments on commit 5cd3266

Please sign in to comment.