Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add started tasks count and time #58

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/dynamic_size/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/dynamic_size

go 1.19
go 1.21

require (
github.com/alitto/pond v1.7.1
)
toolchain go1.21.4

require github.com/alitto/pond v1.7.1

replace github.com/alitto/pond => ../../
8 changes: 4 additions & 4 deletions examples/fixed_size/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/fixed_size

go 1.19
go 1.21

require (
github.com/alitto/pond v1.7.1
)
toolchain go1.21.4

require github.com/alitto/pond v1.7.1

replace github.com/alitto/pond => ../../
8 changes: 4 additions & 4 deletions examples/group_context/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/group_context

go 1.19
go 1.21

require (
github.com/alitto/pond v1.7.1
)
toolchain go1.21.4

require github.com/alitto/pond v1.7.1

replace github.com/alitto/pond => ../../
4 changes: 3 additions & 1 deletion examples/pool_context/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/alitto/pond/examples/pool_context

go 1.19
go 1.21

toolchain go1.21.4

require github.com/alitto/pond v1.7.1

Expand Down
4 changes: 3 additions & 1 deletion examples/prometheus/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/alitto/pond/examples/fixed_size

go 1.19
go 1.21

toolchain go1.21.4

require (
github.com/alitto/pond v1.7.1
Expand Down
8 changes: 4 additions & 4 deletions examples/task_group/go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/alitto/pond/examples/task_group

go 1.19
go 1.21

require (
github.com/alitto/pond v1.7.1
)
toolchain go1.21.4

require github.com/alitto/pond v1.7.1

replace github.com/alitto/pond => ../../
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/alitto/pond
module github.com/apono-io/pond

go 1.19
go 1.21
2 changes: 1 addition & 1 deletion group_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

"github.com/alitto/pond"
"github.com/apono-io/pond"
)

func TestGroupSubmit(t *testing.T) {
Expand Down
36 changes: 30 additions & 6 deletions pond.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,15 @@ func Context(parentCtx context.Context) Option {
type WorkerPool struct {
// Atomic counters, should be placed first so alignment is guaranteed
// for atomic operations.
workerCount int32
idleWorkerCount int32
waitingTaskCount uint64
submittedTaskCount uint64
successfulTaskCount uint64
failedTaskCount uint64
workerCount int32
idleWorkerCount int32
waitingTaskCount uint64
submittedTaskCount uint64
successfulTaskCount uint64
failedTaskCount uint64
timedOutInQueueTaskCount uint64
startedTaskCount uint64
lastStartedTaskTime time.Time
// Configurable settings
maxWorkers int
maxCapacity int
Expand Down Expand Up @@ -189,6 +192,16 @@ func (p *WorkerPool) SubmittedTasks() uint64 {
return atomic.LoadUint64(&p.submittedTaskCount)
}

// StartedTasks returns the total number of tasks submitted & started since the pool was created
func (p *WorkerPool) StartedTasks() uint64 {
return atomic.LoadUint64(&p.startedTaskCount)
}

// LastStartedTaskTime returns the last time a task was taken from the queue by a worker for processing
func (p *WorkerPool) LastStartedTaskTime() time.Time {
return p.lastStartedTaskTime
}

// WaitingTasks returns the current number of tasks in the queue that are waiting to be executed
func (p *WorkerPool) WaitingTasks() uint64 {
return atomic.LoadUint64(&p.waitingTaskCount)
Expand All @@ -205,6 +218,11 @@ func (p *WorkerPool) FailedTasks() uint64 {
return atomic.LoadUint64(&p.failedTaskCount)
}

// FailedTasks returns the total number of tasks that timed-out in queue since the pool was created
func (p *WorkerPool) TimedOutInQueueTasks() uint64 {
return atomic.LoadUint64(&p.timedOutInQueueTaskCount)
}

// CompletedTasks returns the total number of tasks that have completed their exection either successfully
// or with panic since the pool was created
func (p *WorkerPool) CompletedTasks() uint64 {
Expand Down Expand Up @@ -306,6 +324,8 @@ func (p *WorkerPool) SubmitBefore(task func(), deadline time.Duration) {
select {
case <-timer.C:
// Deadline was reached, abort the task
atomic.AddUint64(&p.timedOutInQueueTaskCount, 1)
panic(fmt.Sprintf("task timed out in queue after %s", deadline))
default:
// Deadline not reached, execute the task
defer timer.Stop()
Expand Down Expand Up @@ -450,6 +470,10 @@ func (p *WorkerPool) executeTask(task func(), isFirstTask bool) {
// Decrement waiting task count
atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))

// Increment started task count
atomic.AddUint64(&p.startedTaskCount, 1)
p.lastStartedTaskTime = time.Now()

// Execute task
task()

Expand Down
7 changes: 5 additions & 2 deletions pond_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"testing"
"time"

"github.com/alitto/pond"
"github.com/apono-io/pond"
)

func assertEqual(t *testing.T, expected interface{}, actual interface{}) {
Expand Down Expand Up @@ -198,8 +198,11 @@ func TestSubmitBefore(t *testing.T) {

pool.StopAndWait()

// Only 2 tasks must have executed
// Only 2 tasks must have executed and 1 timed out
assertEqual(t, int32(2), atomic.LoadInt32(&doneCount))
assertEqual(t, uint64(2), pool.SuccessfulTasks())
assertEqual(t, uint64(1), pool.TimedOutInQueueTasks())
assertEqual(t, uint64(1), pool.FailedTasks())
}

func TestSubmitBeforeWithNilTask(t *testing.T) {
Expand Down