forked from alitto/pond
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request alitto#30 from alitto/feature/group-context
Implement task group associated to a Context & more
- Loading branch information
Showing
14 changed files
with
338 additions
and
66 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
module github.com/alitto/pond/examples/dynamic_size | ||
|
||
go 1.17 | ||
go 1.18 | ||
|
||
require ( | ||
github.com/alitto/pond v1.7.0 | ||
github.com/alitto/pond v1.7.1 | ||
) | ||
|
||
replace github.com/alitto/pond => ../../ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
module github.com/alitto/pond/examples/fixed_size | ||
|
||
go 1.17 | ||
go 1.18 | ||
|
||
require ( | ||
github.com/alitto/pond v1.7.0 | ||
github.com/alitto/pond v1.7.1 | ||
) | ||
|
||
replace github.com/alitto/pond => ../../ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
module github.com/alitto/pond/examples/group_context | ||
|
||
go 1.18 | ||
|
||
require ( | ||
github.com/alitto/pond v1.7.1 | ||
) | ||
|
||
replace github.com/alitto/pond => ../../ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/http" | ||
|
||
"github.com/alitto/pond" | ||
) | ||
|
||
func main() { | ||
|
||
// Create a worker pool | ||
pool := pond.New(10, 1000) | ||
defer pool.StopAndWait() | ||
|
||
// Create a task group associated to a context | ||
group, ctx := pool.GroupContext(context.Background()) | ||
|
||
var urls = []string{ | ||
"https://www.golang.org/", | ||
"https://www.google.com/", | ||
"https://www.github.com/", | ||
} | ||
|
||
// Submit tasks to fetch each URL | ||
for _, url := range urls { | ||
url := url | ||
group.Submit(func() error { | ||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
resp, err := http.DefaultClient.Do(req) | ||
if err == nil { | ||
resp.Body.Close() | ||
} | ||
return err | ||
}) | ||
} | ||
|
||
// Wait for all HTTP requests to complete. | ||
err := group.Wait() | ||
if err != nil { | ||
fmt.Printf("Failed to fetch URLs: %v", err) | ||
} else { | ||
fmt.Println("Successfully fetched all URLs") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
module github.com/alitto/pond/examples/pool_context | ||
|
||
go 1.17 | ||
go 1.18 | ||
|
||
require github.com/alitto/pond v1.7.0 | ||
require github.com/alitto/pond v1.7.1 | ||
|
||
replace github.com/alitto/pond => ../../ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
module github.com/alitto/pond/examples/task_group | ||
|
||
go 1.17 | ||
go 1.18 | ||
|
||
require ( | ||
github.com/alitto/pond v1.7.0 | ||
github.com/alitto/pond v1.7.1 | ||
) | ||
|
||
replace github.com/alitto/pond => ../../ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
module github.com/alitto/pond | ||
|
||
go 1.17 | ||
go 1.18 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package pond | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
) | ||
|
||
// TaskGroup represents a group of related tasks | ||
type TaskGroup struct { | ||
pool *WorkerPool | ||
waitGroup sync.WaitGroup | ||
} | ||
|
||
// Submit adds a task to this group and sends it to the worker pool to be executed | ||
func (g *TaskGroup) Submit(task func()) { | ||
g.waitGroup.Add(1) | ||
|
||
g.pool.Submit(func() { | ||
defer g.waitGroup.Done() | ||
|
||
task() | ||
}) | ||
} | ||
|
||
// Wait waits until all the tasks in this group have completed | ||
func (g *TaskGroup) Wait() { | ||
|
||
// Wait for all tasks to complete | ||
g.waitGroup.Wait() | ||
} | ||
|
||
// TaskGroup represents a group of related tasks associated to a context | ||
type TaskGroupWithContext struct { | ||
TaskGroup | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
errOnce sync.Once | ||
err error | ||
} | ||
|
||
// Submit adds a task to this group and sends it to the worker pool to be executed | ||
func (g *TaskGroupWithContext) Submit(task func() error) { | ||
g.waitGroup.Add(1) | ||
|
||
g.pool.Submit(func() { | ||
defer g.waitGroup.Done() | ||
|
||
// If context has already been cancelled, skip task execution | ||
if g.ctx != nil { | ||
select { | ||
case <-g.ctx.Done(): | ||
return | ||
default: | ||
} | ||
} | ||
|
||
// don't actually ignore errors | ||
err := task() | ||
if err != nil { | ||
g.errOnce.Do(func() { | ||
g.err = err | ||
if g.cancel != nil { | ||
g.cancel() | ||
} | ||
}) | ||
} | ||
}) | ||
} | ||
|
||
// Wait blocks until either all the tasks submitted to this group have completed, | ||
// one of them returned a non-nil error or the context associated to this group | ||
// was canceled. | ||
func (g *TaskGroupWithContext) Wait() error { | ||
|
||
// Wait for all tasks to complete | ||
tasksCompleted := make(chan struct{}) | ||
go func() { | ||
g.waitGroup.Wait() | ||
tasksCompleted <- struct{}{} | ||
}() | ||
|
||
select { | ||
case <-tasksCompleted: | ||
// If context was provided, cancel it to signal all running tasks to stop | ||
g.cancel() | ||
case <-g.ctx.Done(): | ||
} | ||
|
||
return g.err | ||
} |
Oops, something went wrong.