forked from AliyunContainerService/pouch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrapper_client.go
87 lines (72 loc) · 2.03 KB
/
wrapper_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package ctrd
import (
"context"
"fmt"
"sync"
"github.com/containerd/containerd"
"github.com/pkg/errors"
)
// WrapperClient wrappers containerd grpc client,
// so that pouch daemon can holds a grpc client pool
// to improve grpc client performance.
type WrapperClient struct {
client *containerd.Client
// Lease is a new feature of containerd, We use it to avoid that the images
// are removed by garbage collection. If no lease is defined, the downloaded images will
// be removed automatically when the container is removed.
lease *containerd.Lease
mux sync.Mutex
// streamQuota records the numbers of stream client without be using
streamQuota int
}
func newWrapperClient(rpcAddr string, defaultns string, maxStreamsClient int) (*WrapperClient, error) {
options := []containerd.ClientOpt{
containerd.WithDefaultNamespace(defaultns),
}
cli, err := containerd.New(rpcAddr, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to connect containerd")
}
// create a new lease or reuse the existed.
var lease containerd.Lease
leases, err := cli.ListLeases(context.TODO())
if err != nil {
return nil, err
}
if len(leases) != 0 {
lease = leases[0]
} else {
if lease, err = cli.CreateLease(context.TODO()); err != nil {
return nil, err
}
}
return &WrapperClient{
client: cli,
lease: &lease,
streamQuota: maxStreamsClient,
}, nil
}
// Produce is to release specified numbers of grpc stream client
// FIXME(ziren): if streamQuota greater than defaultMaxStreamsClient
// what to do ???
func (w *WrapperClient) Produce(v int) {
w.mux.Lock()
defer w.mux.Unlock()
w.streamQuota += v
}
// Consume is to acquire specified numbers of grpc stream client
func (w *WrapperClient) Consume(v int) error {
w.mux.Lock()
defer w.mux.Unlock()
if w.streamQuota < v {
return fmt.Errorf("quota is %d, less than %d, can not acquire", w.streamQuota, v)
}
w.streamQuota -= v
return nil
}
// Value is to get the quota
func (w *WrapperClient) Value() int {
w.mux.Lock()
defer w.mux.Unlock()
return w.streamQuota
}