-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
94 lines (82 loc) · 2.05 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package main
import (
"github.com/FedeDP/container-worker/pkg/container"
"github.com/FedeDP/container-worker/pkg/event"
"reflect"
"sync"
)
/*
#include <stdbool.h>
#include <stdlib.h>
typedef void (*async_cb)(const char *json, bool added, int async_id);
extern void makeCallback(const char *json, bool added, int async_id, async_cb cb) {
cb(json, added, async_id);
}
*/
import "C"
import (
"context"
)
const (
ctxDoneIdx = 0
inotifierIdx = 1
)
type asyncCb func(string, bool)
func workerLoop(ctx context.Context, cb asyncCb, containerEngines []container.Engine, inotifier *container.EngineInotifier) {
var (
evt event.Event
listenWg sync.WaitGroup
)
// We need to use a reflect.SelectCase here since
// we will need to select a variable number of channels
cases := make([]reflect.SelectCase, 0)
// Emplace back case for `ctx.Done` channel
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
})
// Emplace back case for inotifier channel if needed
inotifierCh := inotifier.Listen()
if inotifierCh != nil {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(inotifierCh),
})
}
// Emplace back cases for each container engine listener
for _, engine := range containerEngines {
ch, err := engine.Listen(ctx, &listenWg)
if err != nil {
continue
}
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
for {
chosen, val, _ := reflect.Select(cases)
if chosen == ctxDoneIdx {
// ctx.Done!
break
} else if inotifierCh != nil && chosen == inotifierIdx {
// inotifier!
engine := inotifier.Process(ctx, val.Interface())
if engine != nil {
ch, err := engine.Listen(ctx, &listenWg)
if err != nil {
continue
}
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
}
} else {
evt, _ = val.Interface().(event.Event)
cb(evt.String(), evt.IsCreate)
}
}
inotifier.Close()
listenWg.Wait()
}