-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexec.go
146 lines (126 loc) · 2.9 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package conexec
import (
"context"
"fmt"
"runtime/debug"
"sync"
"sync/atomic"
"time"
)
// wait waits for the notification of execution result
func wait(ctx context.Context, c TimedActuator,
resChan chan error, cancel context.CancelFunc) error {
if timeout := c.GetTimeout(); timeout != nil {
return waitWithTimeout(ctx, resChan, *timeout, cancel)
}
for {
select {
case <-ctx.Done():
return nil
case err := <-resChan:
if err != nil {
cancel()
return err
}
}
}
}
// waitWithTimeout waits for the notification of execution result
// when the timeout is set
func waitWithTimeout(ctx context.Context, resChan chan error,
timeout time.Duration, cancel context.CancelFunc) error {
for {
select {
case <-time.After(timeout):
cancel()
return ErrorTimeOut
case <-ctx.Done():
return nil
case err := <-resChan:
if err != nil {
cancel()
return err
}
}
}
}
// execTasks uses customized function to
// execute every task, such as using the simplyRun
func execTasks(parent context.Context, c TimedActuator,
execFunc func(f func()), tasks ...Task) error {
size := len(tasks)
if size == 0 {
return nil
}
ctx, cancel := context.WithCancel(parent)
resChan := make(chan error, size)
wg := &sync.WaitGroup{}
wg.Add(size)
// Make sure the tasks are completed and channel is closed
go func() {
wg.Wait()
cancel()
close(resChan)
}()
// Sadly we can not kill a goroutine manually
// So when an error happens, the other tasks will continue
// But the good news is that main progress
// will know the error immediately
for _, task := range tasks {
child, _ := context.WithCancel(ctx)
f := wrapperTask(child, task, wg, resChan)
execFunc(f)
}
return wait(ctx, c, resChan, cancel)
}
// simplyRun uses a new goroutine to run the function
func simplyRun(f func()) {
go f()
}
// Exec simply runs the tasks concurrently
// True will be returned is all tasks complete successfully
// otherwise false will be returned
func Exec(tasks ...Task) bool {
var c int32
wg := &sync.WaitGroup{}
wg.Add(len(tasks))
for _, t := range tasks {
go func(task Task) {
defer func() {
if r := recover(); r != nil {
atomic.StoreInt32(&c, 1)
fmt.Printf("conexec panic:%v\n%s\n", r, string(debug.Stack()))
}
wg.Done()
}()
if err := task(); err != nil {
atomic.StoreInt32(&c, 1)
}
}(t)
}
wg.Wait()
return c == 0
}
// ExecWithError simply runs the tasks concurrently
// nil will be returned is all tasks complete successfully
// otherwise custom error will be returned
func ExecWithError(tasks ...Task) error {
var err error
wg := &sync.WaitGroup{}
wg.Add(len(tasks))
for _, t := range tasks {
go func(task Task) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("conexec panic:%v\n%s\n", r, string(debug.Stack()))
}
wg.Done()
}()
if e := task(); e != nil {
err = e
}
}(t)
}
wg.Wait()
return err
}