Skip to content

Commit

Permalink
add mq.delay
Browse files Browse the repository at this point in the history
  • Loading branch information
brzyangg committed Feb 13, 2020
1 parent 87d8bae commit f0ae302
Show file tree
Hide file tree
Showing 12 changed files with 778 additions and 36 deletions.
8 changes: 3 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ module github.com/shawnfeng/sutil
require (
github.com/ZhengHe-MD/agollo/v4 v4.1.3
github.com/ZhengHe-MD/properties v0.2.1
github.com/bitly/go-simplejson v0.4.4-0.20140701141959-3378bdcb5ceb
github.com/bitly/go-simplejson v0.5.0
github.com/coreos/etcd v3.3.17+incompatible
github.com/fzzy/radix v0.4.9-0.20141113025130-a3a55de9c594
github.com/go-redis/redis v6.15.1+incompatible
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.0
github.com/jinzhu/gorm v1.9.10
github.com/jmoiron/sqlx v0.0.0-20170430194603-d9bd385d68c0
github.com/jmoiron/sqlx v1.2.0
github.com/julienschmidt/httprouter v1.2.0
github.com/kaneshin/go-pkg v0.0.0-20150919125626-a8e1479186cf
github.com/lib/pq v1.1.1
Expand All @@ -22,15 +22,13 @@ require (
github.com/sdming/gosnow v0.0.0-20130403030620-3a05c415e886
github.com/segmentio/kafka-go v0.3.4
github.com/shawnfeng/lumberjack.v2 v0.0.0-20181226094728-63d76296ede8
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
github.com/stretchrcom/testify v1.2.2 // indirect
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec
gitlab.pri.ibanyu.com/middleware/delayqueue v0.0.0-20200213090847-cd24af2bd1f2
gitlab.pri.ibanyu.com/middleware/seaweed v1.0.20
go.uber.org/zap v1.10.0

gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect

)
124 changes: 124 additions & 0 deletions go.sum

Large diffs are not rendered by default.

74 changes: 60 additions & 14 deletions mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/shawnfeng/sutil/sconf/center"
"github.com/shawnfeng/sutil/scontext"
"github.com/shawnfeng/sutil/slog/slog"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -20,12 +21,15 @@ type MQType int

const (
MQTypeKafka MQType = iota
MQTypeDelay
)

func (t MQType) String() string {
switch t {
case MQTypeKafka:
return "kafka"
case MQTypeDelay:
return "delay"
default:
return ""
}
Expand Down Expand Up @@ -54,6 +58,9 @@ func (c ConfigerType) String() string {

const (
defaultTimeout = 3 * time.Second
defaultTTR = 3600 // 1 hour
defaultTTL = 3600 * 24 // 1 day
defaultTries = 1
)

type Config struct {
Expand All @@ -64,6 +71,9 @@ type Config struct {
CommitInterval time.Duration
Offset int64
OffsetAt string
TTR uint32 // time to run
TTL uint32 // time to live
Tries uint16 // delay tries
}

type KeyParts struct {
Expand All @@ -75,7 +85,7 @@ var DefaultConfiger Configer

type Configer interface {
Init(ctx context.Context) error
GetConfig(ctx context.Context, topic string) (*Config, error)
GetConfig(ctx context.Context,topic string, mqType MQType) (*Config, error)
ParseKey(ctx context.Context, k string) (*KeyParts, error)
Watch(ctx context.Context) <-chan *center.ChangeEvent
}
Expand Down Expand Up @@ -110,12 +120,12 @@ func (m *SimpleConfig) Init(ctx context.Context) error {
return nil
}

func (m *SimpleConfig) GetConfig(ctx context.Context, topic string) (*Config, error) {
func (m *SimpleConfig) GetConfig(ctx context.Context,topic string, mqType MQType) (*Config, error) {
fun := "SimpleConfig.GetConfig-->"
slog.Infof(ctx, "%s get simple config topic:%s", fun, topic)

return &Config{
MQType: MQTypeKafka,
MQType: mqType,
MQAddr: m.mqAddr,
Topic: topic,
TimeOut: defaultTimeout,
Expand Down Expand Up @@ -153,7 +163,7 @@ func (m *EtcdConfig) Init(ctx context.Context) error {
return nil
}

func (m *EtcdConfig) GetConfig(ctx context.Context, topic string) (*Config, error) {
func (m *EtcdConfig) GetConfig(ctx context.Context,topic string, mqType MQType) (*Config, error) {
fun := "EtcdConfig.GetConfig-->"
slog.Infof(ctx, "%s get etcd config topic:%s", fun, topic)
// TODO
Expand All @@ -177,6 +187,9 @@ const (
apolloBrokersSep = ","
apolloBrokersKey = "brokers"
apolloOffsetAtKey = "offsetat"
apolloTTRKey = "ttr"
apolloTTLKey = "ttl"
apolloTriesKey = "tries"
)

type ApolloConfig struct {
Expand Down Expand Up @@ -221,20 +234,20 @@ func (s simpleContextControlRouter) SetControlRouteGroup(group string) error {
return nil
}

func (m *ApolloConfig) getConfigItemWithFallback(ctx context.Context, topic string, name string) (string, bool) {
val, ok := m.center.GetStringWithNamespace(ctx, center.DefaultApolloMQNamespace, m.buildKey(ctx, topic, name))
func (m *ApolloConfig) getConfigItemWithFallback(ctx context.Context, topic string, name string, mqType MQType) (string, bool) {
val, ok := m.center.GetStringWithNamespace(ctx, center.DefaultApolloMQNamespace, m.buildKey(ctx, topic, name, mqType))
if !ok {
defaultCtx := context.WithValue(ctx, scontext.ContextKeyControl, simpleContextControlRouter{defaultRouteGroup})
val, ok = m.center.GetStringWithNamespace(defaultCtx, center.DefaultApolloMQNamespace, m.buildKey(defaultCtx, topic, name))
val, ok = m.center.GetStringWithNamespace(defaultCtx, center.DefaultApolloMQNamespace, m.buildKey(defaultCtx, topic, name, mqType))
}
return val, ok
}

func (m *ApolloConfig) GetConfig(ctx context.Context, topic string) (*Config, error) {
func (m *ApolloConfig) GetConfig(ctx context.Context,topic string, mqType MQType) (*Config, error) {
fun := "ApolloConfig.GetConfig-->"
slog.Infof(ctx, "%s get mq config topic:%s", fun, topic)

brokersVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloBrokersKey)
brokersVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloBrokersKey, mqType)
if !ok {
return nil, fmt.Errorf("%s no brokers config found", fun)
}
Expand All @@ -248,21 +261,54 @@ func (m *ApolloConfig) GetConfig(ctx context.Context, topic string) (*Config, er

slog.Infof(ctx, "%s got config brokers:%s", fun, brokers)

offsetAtVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloOffsetAtKey)
offsetAtVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloOffsetAtKey, mqType)
if !ok {
slog.Infof(ctx, "%s no offsetAtVal config founds", fun)

}
slog.Infof(ctx, "%s got config offsetAt:%s", fun, offsetAtVal)

ttrVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloTTRKey, mqType)
if !ok {
slog.Infof(ctx, "%s no ttrVal config founds", fun)
}
ttr, err := strconv.ParseUint(ttrVal, 10, 32)
if err != nil {
ttr = defaultTTR
}
slog.Infof(ctx, "%s got config TTR:%d", fun, ttr)

ttlVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloTTLKey, mqType)
if !ok {
slog.Infof(ctx, "%s no ttlVal config founds", fun)
}
ttl, err := strconv.ParseUint(ttlVal, 10, 32)
if err != nil {
ttl = defaultTTL
}
slog.Infof(ctx, "%s got config TTL:%d", fun, ttl)

triesVal, ok := m.getConfigItemWithFallback(ctx, topic, apolloTriesKey, mqType)
if !ok {
slog.Infof(ctx, "%s no triesVal config founds", fun)
}
tries, err := strconv.ParseUint(triesVal, 10, 16)
if err != nil {
tries = defaultTries
}
slog.Infof(ctx, "%s got config triesVal:%s", fun, triesVal)

return &Config{
MQType: MQTypeKafka,
MQType: mqType,
MQAddr: brokers,
Topic: topic,
TimeOut: defaultTimeout,
CommitInterval: 1 * time.Second,
Offset: FirstOffset,
OffsetAt: offsetAtVal,
TTR: uint32(ttr),
TTL: uint32(ttl),
Tries: uint16(tries),
}, nil
}

Expand Down Expand Up @@ -294,7 +340,7 @@ func (ob *apolloObserver) HandleChangeEvent(event *center.ChangeEvent) {
// TODO: filter different mq types
var changes = map[string]*center.Change{}
for k, ce := range event.Changes {
if strings.Contains(k, fmt.Sprint(MQTypeKafka)) {
if strings.Contains(k, fmt.Sprint(MQTypeKafka)) || strings.Contains(k, fmt.Sprint(MQTypeDelay)) {
changes[k] = ce
}
}
Expand All @@ -314,11 +360,11 @@ func (m *ApolloConfig) Watch(ctx context.Context) <-chan *center.ChangeEvent {
return m.ch
}

func (m *ApolloConfig) buildKey(ctx context.Context, topic, item string) string {
func (m *ApolloConfig) buildKey(ctx context.Context, topic, item string, mqType MQType) string {
return strings.Join([]string{
topic,
scontext.GetControlRouteGroupWithDefault(ctx, defaultRouteGroup),
fmt.Sprint(MQTypeKafka),
fmt.Sprint(mqType),
item,
}, apolloConfigSep)
}
46 changes: 37 additions & 9 deletions mq/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func TestApolloConfig_GetConfig(t *testing.T) {
ctx := context.TODO()

conf := NewApolloConfiger()

conf.Init(ctx)
t.Run("valid topic", func(t *testing.T) {
topic := defaultTestTopic
config, err := conf.GetConfig(ctx, topic)
config, err := conf.GetConfig(ctx, topic, MQTypeKafka)
assert.Equal(t, err, nil)
assert.Equal(t, config.MQType, MQTypeKafka)
assert.Equal(t, config.Topic, topic)
Expand All @@ -49,10 +49,19 @@ func TestApolloConfig_GetConfig(t *testing.T) {

t.Run("invalid topic", func(t *testing.T) {
topic := "topic.never.exist"
config, err := conf.GetConfig(ctx, topic)
config, err := conf.GetConfig(ctx, topic, MQTypeKafka)
assert.True(t, config == nil)
assert.NotEqual(t, err, nil)
})

t.Run("delay topic", func(t *testing.T) {
topic := defaultTestTopic
config, err := conf.GetConfig(ctx, topic, MQTypeDelay)
assert.Equal(t, err, nil)
assert.Equal(t, config.MQType, MQTypeDelay)
assert.Equal(t, config.Topic, topic)
assert.True(t, len(config.MQAddr) > 0)
})
}

func TestApolloConfig_buildKey(t *testing.T) {
Expand All @@ -63,16 +72,19 @@ func TestApolloConfig_buildKey(t *testing.T) {
topic string
item string
expectedString string
mqType MQType
}{
{defaultTestTopic, "brokers", fmt.Sprintf("%s.default.kafka.brokers", defaultTestTopic)},
{"topic", "timeout", "topic.default.kafka.timeout"},
{defaultTestTopic, "brokers", fmt.Sprintf("%s.default.kafka.brokers", defaultTestTopic), MQTypeKafka},
{"topic", "timeout", "topic.default.kafka.timeout", MQTypeKafka},
{defaultTestTopic, "brokers", fmt.Sprintf("%s.default.delay.brokers", defaultTestTopic), MQTypeDelay},
}

for _, c := range cases {
assert.Equal(t, conf.buildKey(ctx, c.topic, c.item), c.expectedString)
assert.Equal(t, conf.buildKey(ctx, c.topic, c.item, c.mqType), c.expectedString)
}
}


func TestApolloConfig_ParseKey(t *testing.T) {
ctx := context.TODO()
conf := NewApolloConfiger()
Expand Down Expand Up @@ -102,6 +114,11 @@ func TestApolloConfig_ParseKey(t *testing.T) {
true,
nil,
},
{
"base.changeboard.event.default.delay.brokers",
false,
&KeyParts{"base.changeboard.event", "default"},
},
}

for _, c := range cases {
Expand All @@ -116,7 +133,7 @@ func TestApolloConfig_getConfigItemWithFallback(t *testing.T) {
ctx := context.TODO()
conf := NewApolloConfiger()

brokersVal, ok := conf.getConfigItemWithFallback(ctx, defaultTestTopic, apolloBrokersKey)
brokersVal, ok := conf.getConfigItemWithFallback(ctx, defaultTestTopic, apolloBrokersKey, MQTypeKafka)
assert.True(t, ok)
assert.True(t, len(brokersVal) > 0, "got brokers:", brokersVal)
slog.Infof(ctx, "got brokers:%s", brokersVal)
Expand All @@ -128,7 +145,7 @@ func TestApolloConfig_getConfigItemWithFallback(t *testing.T) {

conf := NewApolloConfiger()

brokersVal, ok := conf.getConfigItemWithFallback(ctx, defaultTestTopic, apolloBrokersKey)
brokersVal, ok := conf.getConfigItemWithFallback(ctx, defaultTestTopic, apolloBrokersKey, MQTypeKafka)
assert.True(t, ok)
assert.True(t, len(brokersVal) > 0, "got brokers:", brokersVal)
slog.Infof(ctx, "got brokers:%s", brokersVal)
Expand All @@ -140,7 +157,18 @@ func TestApolloConfig_getConfigItemWithFallback(t *testing.T) {

conf := NewApolloConfiger()

brokersVal, ok := conf.getConfigItemWithFallback(ctx, defaultTestTopic, apolloBrokersKey)
brokersVal, ok := conf.getConfigItemWithFallback(ctx, defaultTestTopic, apolloBrokersKey, MQTypeKafka)
assert.True(t, ok)
assert.True(t, len(brokersVal) > 0, "got brokers:", brokersVal)
slog.Infof(ctx, "got brokers:%s", brokersVal)
})

t.Run("empty ctx should get default delay value", func(t *testing.T) {
ctx := context.TODO()
conf := NewApolloConfiger()
conf.Init(ctx)

brokersVal, ok := conf.getConfigItemWithFallback(ctx, defaultTestTopic, apolloBrokersKey, MQTypeDelay)
assert.True(t, ok)
assert.True(t, len(brokersVal) > 0, "got brokers:", brokersVal)
slog.Infof(ctx, "got brokers:%s", brokersVal)
Expand Down
Loading

0 comments on commit f0ae302

Please sign in to comment.