Skip to content

Commit

Permalink
Fix corner cases and improve benchmarks section
Browse files Browse the repository at this point in the history
  • Loading branch information
alitto committed Jun 7, 2020
1 parent dfbb355 commit 5f7cd69
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 16 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Some common scenarios include:
- Stopping a worker pool
- Task panics are handled gracefully (configurable panic handler)
- Supports Non-blocking and Blocking task submission modes (buffered / unbuffered)
- Very high performance under heavy workloads (See [benchmarks](./benchmark/README.md))
- Very high performance and efficient resource usage under heavy workloads, even outperforming unbounded goroutines in some scenarios (See [benchmarks](./benchmark/README.md))
- **New (since v1.3.0)**: configurable pool resizing strategy, with 3 presets for common scenarios: Eager, Balanced and Lazy.
- [API reference](https://pkg.go.dev/github.com/alitto/pond)

Expand Down
14 changes: 11 additions & 3 deletions benchmark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ All pools are configured to use a maximum of 200k workers and initialization tim

Here are the results of the benchmark when submitting an asynchronous task that just sleeps for 10ms (`time.Sleep(10 * time.Millisecond)`):

![Benchmark results - Sleep 10ms on 8 cpus](../docs/benchmark-results-sleep10ms-8cpu.svg)

```bash
go test -benchmem -run=^$ github.com/alitto/pond/benchmark -bench '^(BenchmarkAllSleep.*)$' -benchtime=3x -cpu=8
goos: linux
Expand Down Expand Up @@ -64,6 +66,8 @@ ok github.com/alitto/pond/benchmark 138.009s

And these are the results of the benchmark when submitting a synchronous task that just calculates a random float64 number between 0 and 1 (`rand.Float64()`):

![Benchmark results - Random Float64 on 8 cpus](../docs/benchmark-results-randf64-8cpu.svg)

```bash
go test -benchmem -run=^$ github.com/alitto/pond/benchmark -bench '^(BenchmarkAllRand.*)$' -benchtime=3x -cpu=8
goos: linux
Expand Down Expand Up @@ -113,11 +117,13 @@ PASS
ok github.com/alitto/pond/benchmark 93.386s
```

As you can see, _pond_'s resizing strategies (Eager, Balanced or Lazy) behave differently under different workloads and generally one or more of these strategies outperform all the other worker pool implementations, including unbounded goroutines under some specific circumstances.
As you can see, _pond_'s resizing strategies (Eager, Balanced or Lazy) behave differently under different workloads and generally one or more of these strategies outperform all the other worker pool implementations, except for unbounded goroutines.

When running this benchmark with fewer available CPUs, the difference becomes even more significant. For instance, when using only 4 CPUs, _pond_ consistently outperforms launching unbounded goroutines.
Leaving aside the fact that launching unlimited goroutines defeats the goal of limiting concurrency over a resource, its performance is highly dependant on how much resources (CPU and memory) are available at a given time, which make it unpredictable and likely to cause starvation. In other words, it's generally not a good idea for production applications.

Here are the results when using 4 CPUs and submitting the asynchronous task:
We also wanted to see how _pond_ behaves when resources are more constrained, so we repeated the asynchrounous task benchmark (Sleep 10ms), but this time using only 4 CPUs:

![Benchmark results - Sleep 10ms on 4cpus](../docs/benchmark-results-sleep10ms-4cpu.svg)

```bash
go test -benchmem -run=^$ github.com/alitto/pond/benchmark -bench '^(BenchmarkAllSleep.*)$' -benchtime=3x -cpu=4
Expand Down Expand Up @@ -168,4 +174,6 @@ PASS
ok github.com/alitto/pond/benchmark 152.726s
```

When running with fewer available CPUs, the difference becomes more clear when comparing against other worker pool implementations and it even runs faster than launching unbounded goroutines in some of the workloads (when users <= 10k).

These tests were executed on a laptop with an 8-core CPU (Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz) and 16GB of RAM.
1 change: 1 addition & 0 deletions docs/benchmark-results-randf64-8cpu.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/benchmark-results-sleep10ms-4cpu.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/benchmark-results-sleep10ms-8cpu.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
40 changes: 28 additions & 12 deletions pond.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type WorkerPool struct {
purgerQuit chan struct{}
stopOnce sync.Once
waitGroup sync.WaitGroup
mutex sync.Mutex
}

// New creates a worker pool with that can scale up to the given maximum number of workers (maxWorkers).
Expand Down Expand Up @@ -123,7 +124,7 @@ func New(maxWorkers, maxCapacity int, options ...Option) *WorkerPool {
// Start minWorkers workers
if pool.minWorkers > 0 {
for i := 0; i < pool.minWorkers; i++ {
pool.startWorker(nil)
pool.maybeStartWorker(nil)
}
}

Expand Down Expand Up @@ -178,8 +179,7 @@ func (p *WorkerPool) submit(task func(), waitForIdle bool) bool {
}

// Start a worker as long as we haven't reached the limit
if !maxWorkersReached && p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) {
p.startWorker(task)
if ok := p.maybeStartWorker(task); ok {
return true
}

Expand Down Expand Up @@ -251,7 +251,7 @@ Purge:
select {
// Timed out waiting for any activity to happen, attempt to kill an idle worker
case <-idleTicker.C:
if p.Idle() > 0 {
if p.Idle() > 0 && p.Running() > p.minWorkers {
p.tasks <- nil
}
case <-p.purgerQuit:
Expand All @@ -265,28 +265,44 @@ Purge:
}

// startWorkers creates new worker goroutines to run the given tasks
func (p *WorkerPool) startWorker(firstTask func()) {
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {

// Increment worker count
p.incrementWorkerCount()
// Attempt to increment worker count
if ok := p.incrementWorkerCount(); !ok {
return false
}

// Launch worker
go worker(firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.panicHandler)

return true
}

func (p *WorkerPool) incrementWorkerCount() {
func (p *WorkerPool) incrementWorkerCount() bool {

// Increment worker count
// Attempt to increment worker count
p.mutex.Lock()
runningWorkerCount := p.Running()
// Execute the resizing strategy to determine if we can create more workers
if !p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers) || runningWorkerCount >= p.maxWorkers {
p.mutex.Unlock()
return false
}
atomic.AddInt32(&p.workerCount, 1)
p.mutex.Unlock()

// Increment waiting group semaphore
p.waitGroup.Add(1)

return true
}

func (p *WorkerPool) decrementWorkerCount() {

// Decrement worker count
p.mutex.Lock()
atomic.AddInt32(&p.workerCount, -1)
p.mutex.Unlock()

// Decrement waiting group semaphore
p.waitGroup.Done()
Expand All @@ -311,11 +327,11 @@ func worker(firstTask func(), tasks chan func(), idleWorkerCount *int32, exitHan
// Restart goroutine
go worker(nil, tasks, idleWorkerCount, exitHandler, panicHandler)
} else {
// Handle normal exit
exitHandler()

// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)

// Handle normal exit
exitHandler()
}
}()

Expand Down

0 comments on commit 5f7cd69

Please sign in to comment.