Skip to content

Commit

Permalink
support conditional routing with multiple destinations, customize con…
Browse files Browse the repository at this point in the history
…ditional routing priorities and operation in route fail (apache#2685)
  • Loading branch information
YarBor authored Jun 6, 2024
1 parent 104fae2 commit d7ce4d4
Show file tree
Hide file tree
Showing 7 changed files with 547 additions and 86 deletions.
191 changes: 160 additions & 31 deletions cluster/router/condition/dynamic_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,85 +18,218 @@
package condition

import (
"fmt"
"sort"
"strconv"
"strings"
"sync"
)

import (
"github.com/dubbogo/gost/log/logger"
)

import (
"dubbo.apache.org/dubbo-go/v3/cluster/utils"
"dubbo.apache.org/dubbo-go/v3/common"
conf "dubbo.apache.org/dubbo-go/v3/common/config"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/remoting"

"github.com/dubbogo/gost/log/logger"

"gopkg.in/yaml.v2"
)

type conditionRoute []*StateRouter

func (p conditionRoute) route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 || len(p) == 0 {
return invokers
}
for _, router := range p {
invokers, _ = router.Route(invokers, url, invocation)
if len(invokers) == 0 {
break
}
}
return invokers
}

type multiplyConditionRoute []*StateRouter

func (m multiplyConditionRoute) route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 || len(m) == 0 {
return invokers
}
for _, router := range m {
matchInvokers, isMatch := router.Route(invokers, url, invocation)
if !isMatch || (len(matchInvokers) == 0 && !router.force) {
continue
}
return matchInvokers
}
return []protocol.Invoker{}
}

type condRouter interface {
route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker
}

type DynamicRouter struct {
conditionRouters []*StateRouter
routerConfig *config.RouterConfig
mu sync.RWMutex
force bool
enable bool
conditionRouter condRouter
}

func (d *DynamicRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if len(invokers) == 0 || len(d.conditionRouters) == 0 {
if len(invokers) == 0 {
return invokers
}

for _, router := range d.conditionRouters {
invokers = router.Route(invokers, url, invocation)
d.mu.RLock()
force, enable, cr := d.force, d.enable, d.conditionRouter
d.mu.RUnlock()

if !enable {
return invokers
}
if cr != nil {
res := cr.route(invokers, url, invocation)
if len(res) == 0 && !force {
return invokers
}
return res
} else {
return invokers
}
return invokers
}

func (d *DynamicRouter) URL() *common.URL {
return nil
}

func (d *DynamicRouter) Process(event *config_center.ConfigChangeEvent) {
d.mu.Lock()
defer d.mu.Unlock()

if event.ConfigType == remoting.EventTypeDel {
d.routerConfig = nil
d.conditionRouters = make([]*StateRouter, 0)
d.conditionRouter = nil
} else {
routerConfig, err := parseRoute(event.Value.(string))
rc, force, enable, err := generateCondition(event.Value.(string))
if err != nil {
logger.Warnf("[condition router]Build a new condition route config error, %+v and we will use the original condition rule configuration.", err)
return
logger.Errorf("generate condition error: %v", err)
d.conditionRouter = nil
} else {
d.force, d.enable, d.conditionRouter = force, enable, rc
}
}
}

/*
to check configVersion, here need decode twice.
From a performance perspective, decoding from a string and decoding from a map[string]interface{}
cost nearly same (a few milliseconds).
To keep the code simpler,
here use yaml-to-map and yaml-to-struct, not yaml-to-map-to-struct
generateCondition @return(router,force,enable,error)
*/
func generateCondition(rawConfig string) (condRouter, bool, bool, error) {
m := map[string]interface{}{}

err := yaml.Unmarshal([]byte(rawConfig), m)
if err != nil {
return nil, false, false, err
}

rawVersion, ok := m["configVersion"]
if !ok {
return nil, false, false, fmt.Errorf("miss `ConfigVersion` in %s", rawConfig)
}

version, ok := rawVersion.(string)
if !ok {
return nil, false, false, fmt.Errorf("`ConfigVersion` should be of type `string`, got %T", rawVersion)
}

v, parseErr := utils.ParseVersion(version)
if parseErr != nil {
return nil, false, false, fmt.Errorf("invalid version %s: %s", version, parseErr.Error())
}

switch {
case v.Equal(utils.V3_1) || v.Greater(utils.V3_1):
return generateMultiConditionRoute(rawConfig)
case v.Less(utils.V3_1):
return generateConditionsRoute(rawConfig)
default:
panic("invalid version compare return")
}
}

func generateMultiConditionRoute(rawConfig string) (multiplyConditionRoute, bool, bool, error) {
routerConfig, err := parseMultiConditionRoute(rawConfig)
if err != nil {
logger.Warnf("[condition router]Build a new condition route config error, %s and we will use the original condition rule configuration.", err.Error())
return nil, false, false, err
}

force, enable := routerConfig.Enabled, routerConfig.Force
if !enable {
return nil, false, false, nil
}

conditionRouters := make([]*StateRouter, 0, len(routerConfig.Conditions))
for _, conditionRule := range routerConfig.Conditions {
url, err := common.NewURL("condition://")
if err != nil {
return nil, false, false, err
}
d.routerConfig = routerConfig
conditions, err := generateConditions(d.routerConfig)
url.AddParam(constant.RuleKey, conditionRule.Rule)
url.AddParam(constant.ForceKey, strconv.FormatBool(conditionRule.Force))
url.AddParam(constant.PriorityKey, strconv.FormatInt(int64(conditionRule.Priority), 10))
conditionRoute, err := NewConditionStateRouter(url)
if err != nil {
logger.Warnf("[condition router]Build a new condition route config error, %+v and we will use the original condition rule configuration.", err)
return
return nil, false, false, err
}
d.conditionRouters = conditions
conditionRouters = append(conditionRouters, conditionRoute)
}

sort.Slice(conditionRouters, func(i, j int) bool {
return conditionRouters[i].priority > conditionRouters[j].priority
})
return conditionRouters, force, enable, nil
}

func generateConditions(routerConfig *config.RouterConfig) ([]*StateRouter, error) {
if routerConfig == nil {
return make([]*StateRouter, 0), nil
func generateConditionsRoute(rawConfig string) (conditionRoute, bool, bool, error) {
routerConfig, err := parseConditionRoute(rawConfig)
if err != nil {
logger.Warnf("[condition router]Build a new condition route config error, %s and we will use the original condition rule configuration.", err.Error())
return nil, false, false, err
}

force, enable := *routerConfig.Enabled, *routerConfig.Force
if !enable {
return nil, false, false, nil
}

conditionRouters := make([]*StateRouter, 0, len(routerConfig.Conditions))
for _, conditionRule := range routerConfig.Conditions {
url, err := common.NewURL("condition://")
if err != nil {
return nil, err
return nil, false, false, err
}
url.AddParam(constant.RuleKey, conditionRule)
url.AddParam(constant.ForceKey, strconv.FormatBool(*routerConfig.Force))
url.AddParam(constant.EnabledKey, strconv.FormatBool(*routerConfig.Enabled))
conditionRoute, err := NewConditionStateRouter(url)
if err != nil {
return nil, err
return nil, false, false, err
}
conditionRouters = append(conditionRouters, conditionRoute)
}
return conditionRouters, nil
return conditionRouters, force, enable, nil
}

// ServiceRouter is Service level router
Expand Down Expand Up @@ -142,7 +275,6 @@ type ApplicationRouter struct {
DynamicRouter
application string
currentApplication string
mu sync.Mutex
}

func NewApplicationRouter() *ApplicationRouter {
Expand Down Expand Up @@ -184,9 +316,6 @@ func (a *ApplicationRouter) Notify(invokers []protocol.Invoker) {
return
}

a.mu.Lock()
defer a.mu.Unlock()

if providerApplicaton != a.application {
if a.application != "" {
dynamicConfiguration.RemoveListener(strings.Join([]string{a.application, constant.ConditionRouterRuleSuffix}, ""), a)
Expand Down
75 changes: 32 additions & 43 deletions cluster/router/condition/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,14 @@ import (
)

var (
routePattern = regexp.MustCompile("([&!=,]*)\\s*([^&!=,\\s]+)")

illegalMsg = "Illegal route rule \"%s\", The error char '%s' before '%s'"

routePattern = regexp.MustCompile("([&!=,]*)\\s*([^&!=,\\s]+)")
illegalMsg = "Illegal route rule \"%s\", The error char '%s' before '%s'"
matcherFactories = make([]matcher.ConditionMatcherFactory, 0, 8)

once sync.Once
once sync.Once
)

type StateRouter struct {
enable bool
priority int64
force bool
url *common.URL
whenCondition map[string]matcher.Matcher
Expand All @@ -66,41 +63,38 @@ func NewConditionStateRouter(url *common.URL) (*StateRouter, error) {
}

force := url.GetParamBool(constant.ForceKey, false)
enable := url.GetParamBool(constant.EnabledKey, true)
priority := url.GetParamInt(constant.PriorityKey, constant.DefaultPriority)
c := &StateRouter{
url: url,
force: force,
enable: enable,
url: url,
force: force,
priority: priority,
}

if enable {
when, then, err := generateMatcher(url)
if err != nil {
return nil, err
}
c.whenCondition = when
c.thenCondition = then
when, then, err := generateMatcher(url)
if err != nil {
return nil, err
}
c.whenCondition = when
c.thenCondition = then
return c, nil
}

// Route Determine the target invokers list.
func (s *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if !s.enable {
return invokers
}

// condition rule like `self_condition => peers_condition `
//
// @return active_peers_invokers, Is_self_condition_match_success
func (s *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) ([]protocol.Invoker, bool) {
if len(invokers) == 0 {
return invokers
return invokers, false
}

if !s.matchWhen(url, invocation) {
return invokers
return invokers, false
}

if len(s.thenCondition) == 0 {
logger.Warn("condition state router thenCondition is empty")
return []protocol.Invoker{}
return []protocol.Invoker{}, true
}

var result = make([]protocol.Invoker, 0, len(invokers))
Expand All @@ -110,22 +104,7 @@ func (s *StateRouter) Route(invokers []protocol.Invoker, url *common.URL, invoca
}
}

if len(result) != 0 {
return result
} else if s.force {
logger.Warn("execute condition state router result list is empty. and force=true")
return result
}

return invokers
}

func (s *StateRouter) URL() *common.URL {
return s.url
}

func (s *StateRouter) Priority() int64 {
return 0
return result, true
}

func (s *StateRouter) matchWhen(url *common.URL, invocation protocol.Invocation) bool {
Expand Down Expand Up @@ -318,7 +297,7 @@ func (a byPriority) Len() int { return len(a) }
func (a byPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byPriority) Less(i, j int) bool { return a[i].Priority() < a[j].Priority() }

func parseRoute(routeContent string) (*config.RouterConfig, error) {
func parseConditionRoute(routeContent string) (*config.RouterConfig, error) {
routeDecoder := yaml.NewDecoder(strings.NewReader(routeContent))
routerConfig := &config.RouterConfig{}
err := routeDecoder.Decode(routerConfig)
Expand All @@ -327,3 +306,13 @@ func parseRoute(routeContent string) (*config.RouterConfig, error) {
}
return routerConfig, nil
}

func parseMultiConditionRoute(routeContent string) (*config.ConditionRouter, error) {
routeDecoder := yaml.NewDecoder(strings.NewReader(routeContent))
routerConfig := &config.ConditionRouter{}
err := routeDecoder.Decode(routerConfig)
if err != nil {
return nil, err
}
return routerConfig, nil
}
Loading

0 comments on commit d7ce4d4

Please sign in to comment.