Skip to content

Commit c24faa0

Browse files
author
binbin
committed
i This is a combination of 2 commits.
xds circuit breaker max request
1 parent 9838ce9 commit c24faa0

File tree

157 files changed

+1148
-992
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

157 files changed

+1148
-992
lines changed

common/constant/key.go

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ const (
9393
TokenFilterKey = "token"
9494
TpsLimitFilterKey = "tps"
9595
TracingFilterKey = "tracing"
96+
XdsCircuitBreakerKey = "xds_circuit_reaker"
9697
)
9798

9899
const (

common/constant/xds.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package constant
219

320
const (

common/url.go

+18
Original file line numberDiff line numberDiff line change
@@ -875,3 +875,21 @@ func (c *URL) GetParamDuration(s string, d string) time.Duration {
875875
}
876876
return 3 * time.Second
877877
}
878+
879+
func GetSubscribeName(url *URL) string {
880+
var buffer bytes.Buffer
881+
882+
buffer.Write([]byte(DubboNodes[PROVIDER]))
883+
appendParam(&buffer, url, constant.InterfaceKey)
884+
appendParam(&buffer, url, constant.VersionKey)
885+
appendParam(&buffer, url, constant.GroupKey)
886+
return buffer.String()
887+
}
888+
889+
func appendParam(target *bytes.Buffer, url *URL, key string) {
890+
value := url.GetParam(key, "")
891+
target.Write([]byte(constant.NacosServiceNameSeparator))
892+
if strings.TrimSpace(value) != "" {
893+
target.Write([]byte(value))
894+
}
895+
}

filter/xds/cb/filter.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package cb
19+
20+
import (
21+
"context"
22+
)
23+
24+
import (
25+
"dubbo.apache.org/dubbo-go/v3/common"
26+
"dubbo.apache.org/dubbo-go/v3/common/constant"
27+
"dubbo.apache.org/dubbo-go/v3/common/extension"
28+
"dubbo.apache.org/dubbo-go/v3/common/logger"
29+
"dubbo.apache.org/dubbo-go/v3/filter"
30+
"dubbo.apache.org/dubbo-go/v3/protocol"
31+
"dubbo.apache.org/dubbo-go/v3/remoting/xds"
32+
"dubbo.apache.org/dubbo-go/v3/xds/client"
33+
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
34+
)
35+
36+
// this should be executed before users set their own Tracer
37+
func init() {
38+
extension.SetFilter(constant.XdsCircuitBreakerKey, newCircuitBreakerFilter)
39+
}
40+
41+
// if you wish to using opentracing, please add the this filter into your filter attribute in your configure file.
42+
// notice that this could be used in both client-side and server-side.
43+
type circuitBreakerFilter struct {
44+
client *xds.WrappedClient
45+
}
46+
47+
func (cb *circuitBreakerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
48+
url := invoker.GetURL()
49+
rejectedExeHandler := url.GetParam(constant.DefaultKey, constant.DefaultKey)
50+
clusterUpdate, err := cb.getClusterUpdate(url)
51+
if err != nil {
52+
logger.Errorf("xds circuitBreakerFilter get request counter fail", err)
53+
return nil
54+
}
55+
counter := client.GetClusterRequestsCounter(clusterUpdate.ClusterName, clusterUpdate.EDSServiceName)
56+
if err := counter.StartRequest(*clusterUpdate.MaxRequests); err != nil {
57+
rejectedExecutionHandler, err := extension.GetRejectedExecutionHandler(rejectedExeHandler)
58+
if err != nil {
59+
logger.Warn(err)
60+
} else {
61+
return rejectedExecutionHandler.RejectedExecution(url, invocation)
62+
}
63+
}
64+
return invoker.Invoke(ctx, invocation)
65+
}
66+
67+
func (cb *circuitBreakerFilter) OnResponse(ctx context.Context, result protocol.Result,
68+
invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
69+
url := invoker.GetURL()
70+
clusterUpdate, err := cb.getClusterUpdate(url)
71+
if err != nil {
72+
logger.Errorf("xds circuitBreakerFilter get request counter fail", err)
73+
return nil
74+
}
75+
counter := client.GetClusterRequestsCounter(clusterUpdate.ClusterName, clusterUpdate.EDSServiceName)
76+
counter.EndRequest()
77+
return result
78+
}
79+
80+
var circuitBreakerFilterInstance filter.Filter
81+
82+
func newCircuitBreakerFilter() filter.Filter {
83+
if circuitBreakerFilterInstance == nil {
84+
circuitBreakerFilterInstance = &circuitBreakerFilter{
85+
client: xds.GetXDSWrappedClient(),
86+
}
87+
}
88+
return circuitBreakerFilterInstance
89+
}
90+
91+
func (cb *circuitBreakerFilter) getClusterUpdate(url *common.URL) (resource.ClusterUpdate, error) {
92+
hostAddr, err := cb.client.GetHostAddrByServiceUniqueKey(common.GetSubscribeName(url))
93+
if err != nil {
94+
logger.Errorf("xds circuitBreakerFilter get GetHostAddrByServiceUniqueKey fail", err)
95+
return resource.ClusterUpdate{}, err
96+
}
97+
clusterUpdate := cb.client.GetClusterUpdateIgnoreVersion(hostAddr)
98+
return clusterUpdate, nil
99+
}

registry/event_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestKey(t *testing.T) {
3434
se := ServiceEvent{
3535
Service: u1,
3636
}
37-
assert.Equal(t, se.Key(), "dubbo://:@127.0.0.1:20000/?interface=com.ikurento.user.UserProvider&group=&version=2.0&timestamp=")
37+
assert.Equal(t, se.Key(), "dubbo://:@127.0.0.1:20000/?interface=com.ikurento.user.UserProvider&group=&version=2.0&timestamp=&meshClusterId=")
3838

3939
se2 := ServiceEvent{
4040
Service: u1,

remoting/xds/client.go

+36-7
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package xds
219

320
import (
421
"encoding/json"
522
"fmt"
623
"io/ioutil"
724
"net/http"
8-
"os"
925
"strings"
1026
"sync"
1127
"time"
@@ -173,7 +189,7 @@ func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr Addr) (*W
173189

174190
func (w *WrappedClient) getServiceUniqueKeyHostAddrMapFromPilot() (map[string]string, error) {
175191
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s:8080/debug/adsz", w.istiodPodIP), nil)
176-
token, err := os.ReadFile(istiodTokenPath)
192+
token, err := ioutil.ReadFile(istiodTokenPath)
177193
if err != nil {
178194
return nil, err
179195
}
@@ -229,12 +245,25 @@ func getHostNameAndPortFromAddr(hostAddr string) (string, string) {
229245
return hostName, port
230246
}
231247

248+
func (w *WrappedClient) GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate {
249+
hostName, port := getHostNameAndPortFromAddr(hostAddr)
250+
w.cdsMapLock.RLock()
251+
defer w.cdsMapLock.Unlock()
252+
for clusterName, v := range w.cdsMap {
253+
clusterNameData := strings.Split(clusterName, "|")
254+
if clusterNameData[1] == port && clusterNameData[3] == hostName {
255+
return v
256+
}
257+
}
258+
return resource.ClusterUpdate{}
259+
}
260+
232261
func (w *WrappedClient) getAllVersionClusterName(hostAddr string) []string {
233262
hostName, port := getHostNameAndPortFromAddr(hostAddr)
234263
allVersionClusterNames := make([]string, 0)
235264
w.cdsMapLock.RLock()
236265
defer w.cdsMapLock.Unlock()
237-
for clusterName, _ := range w.cdsMap {
266+
for clusterName := range w.cdsMap {
238267
clusterNameData := strings.Split(clusterName, "|")
239268
if clusterNameData[1] == port && clusterNameData[3] == hostName {
240269
allVersionClusterNames = append(allVersionClusterNames, clusterName)
@@ -296,7 +325,7 @@ func (w *WrappedClient) registerHostLevelSubscription(hostAddr, interfaceName, s
296325
w.hostAddrClusterCtxMapLock.Unlock()
297326

298327
oldlisteningClusterMap := make(map[string]bool)
299-
for cluster, _ := range listeningClustersCancelMap {
328+
for cluster := range listeningClustersCancelMap {
300329
oldlisteningClusterMap[cluster] = false
301330
}
302331
for _, updatedClusterName := range updatedAllVersionedClusterName {
@@ -485,7 +514,7 @@ func (w *WrappedClient) initClientAndLoadLocalHostAddr() error {
485514
// todo: what's going on? istiod can't discover istiod.istio-system.svc.cluster.local!!
486515
if clusterNameList[3] == w.istiodAddr.HostnameOrIP {
487516
// 1. find istiod podIP
488-
// todo: When would eds level watch be cancelled?
517+
// todo: When would eds level watch be canceled?
489518
cancel1 = xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
490519
if foundIstiod {
491520
return
@@ -502,7 +531,7 @@ func (w *WrappedClient) initClientAndLoadLocalHostAddr() error {
502531
return
503532
}
504533
// 2. found local hostAddr
505-
// todo: When would eds level watch be cancelled?
534+
// todo: When would eds level watch be canceled?
506535
cancel2 = xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
507536
if foundLocal {
508537
return
@@ -580,7 +609,7 @@ func getDubboGoMetadata(dubboGoMetadata string) *structpb.Struct {
580609
}
581610

582611
func (w *WrappedClient) runWatchingResource() {
583-
for _ = range w.cdsUpdateEventChan {
612+
for range w.cdsUpdateEventChan {
584613
w.cdsUpdateEventHandlersLock.RLock()
585614
for _, h := range w.cdsUpdateEventHandlers {
586615
h()

remoting/xds/debug.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package xds
219

320
import (

remoting/xds/ewatcher.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package xds
219

320
import (

remoting/xds/model.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package xds
219

320
import (

test/xds/main.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package main
219

320
import (

xds/balancer/balancer.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
/*
2-
*
3-
* Copyright 2020 gRPC authors.
4-
*
5-
* Licensed under the Apache License, Version 2.0 (the "License");
6-
* you may not use this file except in compliance with the License.
7-
* You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
88
*
99
* http://www.apache.org/licenses/LICENSE-2.0
1010
*
@@ -13,7 +13,6 @@
1313
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
16-
*
1716
*/
1817

1918
// Package balancer installs all the xds balancers.

xds/balancer/cdsbalancer/cdsbalancer.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
/*
2-
* Copyright 2019 gRPC authors.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
78
*
89
* http://www.apache.org/licenses/LICENSE-2.0
910
*

0 commit comments

Comments
 (0)