Skip to content

Commit

Permalink
change kafka config
Browse files Browse the repository at this point in the history
  • Loading branch information
why444216978 committed Jan 19, 2025
1 parent 5dcb835 commit a55b418
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 75 deletions.
77 changes: 14 additions & 63 deletions library/queue/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,31 @@ import (

type options struct {
logger logger.Logger
consumeLog bool
syncConfig *sarama.Config
asyncConfig *sarama.Config
consumeConfig *sarama.Config
config *sarama.Config
refreshInterval time.Duration
}

type OptionFunc func(*options)

func defaultOptions() *options {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Flush.Frequency = 500 * time.Millisecond
config.Producer.Return.Successes = true

return &options{
logger: zap.StdLogger,
consumeLog: false,
config: config,
refreshInterval: time.Minute,
}
}

func RefreshInterval(t time.Duration) OptionFunc {
func WithRefreshInterval(t time.Duration) OptionFunc {
return func(o *options) { o.refreshInterval = t }
}

func OpenConsumeLog() OptionFunc {
return func(o *options) { o.consumeLog = true }
}

func SyncConfig(c *sarama.Config) OptionFunc {
return func(o *options) { o.syncConfig = c }
}

func AsyncConfig(c *sarama.Config) OptionFunc {
return func(o *options) { o.asyncConfig = c }
}

func ConsumeConfig(c *sarama.Config) OptionFunc {
return func(o *options) { o.consumeConfig = c }
func WithConfig(c *sarama.Config) OptionFunc {
return func(o *options) { o.config = c }
}

type Client struct {
Expand Down Expand Up @@ -105,14 +95,12 @@ func (cli *Client) Consume(params interface{}) (err error) {
return errors.New("consumer is nil")
}

config := cli.newConsumeConfig()

addrs, err := cli.addrs()
if err != nil {
return
}

client, err := sarama.NewClient(addrs, config)
client, err := sarama.NewClient(addrs, cli.opts.config)
if err != nil {
return
}
Expand Down Expand Up @@ -197,7 +185,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}
}()
_, retry, err = c.consumer(ctx, msg)
if err != nil && c.opts.consumeLog {
if err != nil {
c.opts.logger.Error(ctx, "kafkaConsumeRejectErr",
logger.Error(err),
logger.Reflect("retry", retry))
Expand Down Expand Up @@ -258,14 +246,12 @@ func (cli *Client) Shutdown() (err error) {
}

func (cli *Client) newAsyncProducer() (err error) {
config := cli.newAsyncConfig()

addrs, err := cli.addrs()
if err != nil {
return
}

if cli.asyncClient, err = sarama.NewClient(addrs, config); err != nil {
if cli.asyncClient, err = sarama.NewClient(addrs, cli.opts.config); err != nil {
return
}

Expand Down Expand Up @@ -294,14 +280,12 @@ func (cli *Client) newAsyncProducer() (err error) {
}

func (cli *Client) newSyncProducer() (err error) {
config := cli.newSyncConfig()

addrs, err := cli.addrs()
if err != nil {
return
}

if cli.syncClient, err = sarama.NewClient(addrs, config); err != nil {
if cli.syncClient, err = sarama.NewClient(addrs, cli.opts.config); err != nil {
return
}

Expand All @@ -314,39 +298,6 @@ func (cli *Client) newSyncProducer() (err error) {
return
}

func (cli *Client) newConsumeConfig() *sarama.Config {
if cli.opts != nil && cli.opts.consumeConfig != nil {
return cli.opts.consumeConfig
}

return sarama.NewConfig()
}

func (cli *Client) newAsyncConfig() *sarama.Config {
if cli.opts != nil && cli.opts.asyncConfig != nil {
return cli.opts.asyncConfig
}

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal // Only wait for the leader to ack
config.Producer.Flush.Frequency = 500 * time.Millisecond // Flush batches every 500ms

return config
}

func (cli *Client) newSyncConfig() *sarama.Config {
if cli.opts != nil && cli.opts.syncConfig != nil {
return cli.opts.syncConfig
}

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 3
config.Producer.Return.Successes = true

return config
}

func (cli *Client) refreshBrokers(client sarama.Client) {
go func() {
for range time.NewTicker(cli.opts.refreshInterval).C {
Expand Down
14 changes: 2 additions & 12 deletions library/queue/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,6 @@ func TestProduce(t *testing.T) {
_ = cli.Shutdown()
}

func TestConfig(t *testing.T) {
cli := &Client{}
cli.newSyncConfig()
cli.newAsyncProducer()
cli.newConsumeConfig()
}

func newClient(t *testing.T, serviceName, group, topic string) *Client {
mockBroker := initMockBroker(t, serviceName, group, topic)
defer mockBroker.Close()
Expand Down Expand Up @@ -116,11 +109,8 @@ func newClient(t *testing.T, serviceName, group, topic string) *Client {
config.Consumer.Return.Errors = true
config.Consumer.Offsets.Initial = sarama.OffsetNewest
cli, _ := New(serviceName,
SyncConfig(config),
AsyncConfig(config),
ConsumeConfig(config),
RefreshInterval(time.Second),
OpenConsumeLog(),
WithConfig(config),
WithRefreshInterval(time.Second),
)
return cli
}
Expand Down

0 comments on commit a55b418

Please sign in to comment.