forked from hashicorp/consul
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfailover_policy.go
158 lines (134 loc) · 4.58 KB
/
failover_policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package xds
import (
"fmt"
envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
)
type discoChainTargets struct {
baseClusterName string
targets []targetInfo
failover bool
failoverPolicy structs.ServiceResolverFailoverPolicy
}
type targetInfo struct {
TargetID string
TLSContext *envoy_tls_v3.UpstreamTlsContext
// Region is the region from the failover target's Locality. nil means the
// target is in the local Consul cluster.
Region *string
}
type discoChainTargetGroup struct {
Targets []targetInfo
ClusterName string
}
func (ft discoChainTargets) groupedTargets() ([]discoChainTargetGroup, error) {
var targetGroups []discoChainTargetGroup
if !ft.failover {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: ft.baseClusterName,
Targets: ft.targets,
})
return targetGroups, nil
}
switch ft.failoverPolicy.Mode {
case "sequential", "":
return ft.sequential()
case "order-by-locality":
return ft.orderByLocality()
default:
return targetGroups, fmt.Errorf("unexpected failover policy")
}
}
func (s *ResourceGenerator) mapDiscoChainTargets(cfgSnap *proxycfg.ConfigSnapshot, chain *structs.CompiledDiscoveryChain, node *structs.DiscoveryGraphNode, upstreamConfig structs.UpstreamConfig, forMeshGateway bool) (discoChainTargets, error) {
failoverTargets := discoChainTargets{}
if node.Resolver == nil {
return discoChainTargets{}, fmt.Errorf("impossible to process a non-resolver node")
}
primaryTargetID := node.Resolver.Target
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
if err != nil && !forMeshGateway {
return discoChainTargets{}, err
}
failoverTargets.baseClusterName = s.getTargetClusterName(upstreamsSnapshot, chain, primaryTargetID, forMeshGateway, false)
tids := []string{primaryTargetID}
failover := node.Resolver.Failover
if failover != nil && !forMeshGateway {
tids = append(tids, failover.Targets...)
failoverTargets.failover = true
if failover.Policy == nil {
failoverTargets.failoverPolicy = structs.ServiceResolverFailoverPolicy{}
} else {
failoverTargets.failoverPolicy = *failover.Policy
}
}
for _, tid := range tids {
target := chain.Targets[tid]
var sni, rootPEMs string
var spiffeIDs []string
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
ti := targetInfo{TargetID: tid}
configureTLS := true
if forMeshGateway {
// We only initiate TLS if we're doing an L7 proxy.
configureTLS = structs.IsProtocolHTTPLike(upstreamConfig.Protocol)
}
if !configureTLS {
failoverTargets.targets = append(failoverTargets.targets, ti)
continue
}
if targetUID.Peer != "" {
tbs, _ := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
rootPEMs = tbs.ConcatenatedRootPEMs()
peerMeta, found := upstreamsSnapshot.UpstreamPeerMeta(targetUID)
if !found {
s.Logger.Warn("failed to fetch upstream peering metadata", "target", targetUID)
continue
}
sni = peerMeta.PrimarySNI()
spiffeIDs = peerMeta.SpiffeID
region := target.Locality.GetRegion()
ti.Region = ®ion
} else {
sni = target.SNI
rootPEMs = cfgSnap.RootPEMs()
spiffeIDs = []string{connect.SpiffeIDService{
Host: cfgSnap.Roots.TrustDomain,
Namespace: target.Namespace,
Partition: target.Partition,
Datacenter: target.Datacenter,
Service: target.Service,
}.URI().String()}
}
commonTLSContext := makeCommonTLSContext(
cfgSnap.Leaf(),
rootPEMs,
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
)
err := injectSANMatcher(commonTLSContext, spiffeIDs...)
if err != nil {
return failoverTargets, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err)
}
tlsContext := &envoy_tls_v3.UpstreamTlsContext{
CommonTlsContext: commonTLSContext,
Sni: sni,
}
ti.TLSContext = tlsContext
failoverTargets.targets = append(failoverTargets.targets, ti)
}
return failoverTargets, nil
}
func (ft discoChainTargets) sequential() ([]discoChainTargetGroup, error) {
var targetGroups []discoChainTargetGroup
for i, t := range ft.targets {
targetGroups = append(targetGroups, discoChainTargetGroup{
ClusterName: fmt.Sprintf("%s%d~%s", xdscommon.FailoverClusterNamePrefix, i, ft.baseClusterName),
Targets: []targetInfo{t},
})
}
return targetGroups, nil
}