forked from kelseyhightower/confd
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.go
169 lines (147 loc) · 3.84 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package etcd
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net"
"net/http"
"strings"
"time"
"github.com/coreos/etcd/client"
"golang.org/x/net/context"
"github.com/kelseyhightower/confd/log"
)
// Client is a wrapper around the etcd client
type Client struct {
client client.KeysAPI
}
// NewEtcdClient returns an *etcd.Client with a connection to named machines.
func NewEtcdClient(machines []string, cert, key, caCert string, clientInsecure bool, basicAuth bool, username string, password string) (*Client, error) {
var c client.Client
var kapi client.KeysAPI
var err error
var transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
}
// Enable client insecure mode globally
if clientInsecure {
log.Warning("TLS Client config running insecure mode. Skip server CA verification.")
}
tlsConfig := &tls.Config{
InsecureSkipVerify: clientInsecure,
}
cfg := client.Config{
Endpoints: machines,
HeaderTimeoutPerRequest: time.Duration(3) * time.Second,
}
if basicAuth {
cfg.Username = username
cfg.Password = password
}
if caCert != "" {
certBytes, err := ioutil.ReadFile(caCert)
if err != nil {
return &Client{kapi}, err
}
caCertPool := x509.NewCertPool()
ok := caCertPool.AppendCertsFromPEM(certBytes)
if ok {
tlsConfig.RootCAs = caCertPool
}
}
if cert != "" && key != "" {
tlsCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return &Client{kapi}, err
}
tlsConfig.Certificates = []tls.Certificate{tlsCert}
}
transport.TLSClientConfig = tlsConfig
cfg.Transport = transport
c, err = client.New(cfg)
if err != nil {
return &Client{kapi}, err
}
kapi = client.NewKeysAPI(c)
return &Client{kapi}, nil
}
// GetValues queries etcd for keys prefixed by prefix.
func (c *Client) GetValues(keys []string) (map[string]string, error) {
vars := make(map[string]string)
for _, key := range keys {
resp, err := c.client.Get(context.Background(), key, &client.GetOptions{
Recursive: true,
Sort: true,
Quorum: true,
})
if err != nil {
return vars, err
}
err = nodeWalk(resp.Node, vars)
if err != nil {
return vars, err
}
}
return vars, nil
}
// nodeWalk recursively descends nodes, updating vars.
func nodeWalk(node *client.Node, vars map[string]string) error {
if node != nil {
key := node.Key
if !node.Dir {
vars[key] = node.Value
} else {
for _, node := range node.Nodes {
nodeWalk(node, vars)
}
}
}
return nil
}
func (c *Client) WatchPrefix(prefix string, keys []string, waitIndex uint64, stopChan chan bool) (uint64, error) {
// return something > 0 to trigger a key retrieval from the store
if waitIndex == 0 {
return 1, nil
}
// Setting AfterIndex to 0 (default) means that the Watcher
// should start watching for events starting at the current
// index, whatever that may be.
watcher := c.client.Watcher(prefix, &client.WatcherOptions{AfterIndex: uint64(0), Recursive: true})
ctx, cancel := context.WithCancel(context.Background())
cancelRoutine := make(chan bool)
defer close(cancelRoutine)
go func() {
select {
case <-stopChan:
cancel()
case <-cancelRoutine:
return
}
}()
for {
resp, err := watcher.Next(ctx)
if err != nil {
switch e := err.(type) {
case *client.Error:
if e.Code == 401 {
return 0, nil
}
}
return waitIndex, err
}
// Only return if we have a key prefix we care about.
// This is not an exact match on the key so there is a chance
// we will still pickup on false positives. The net win here
// is reducing the scope of keys that can trigger updates.
for _, k := range keys {
if strings.HasPrefix(resp.Node.Key, k) {
return resp.Node.ModifiedIndex, err
}
}
}
}