Skip to content

Commit

Permalink
Merge pull request 3Xpl0it3r#3 from daixiansuo/clickpaas
Browse files Browse the repository at this point in the history
修改配置
  • Loading branch information
3Xpl0it3r authored Apr 28, 2022
2 parents b9e24ba + 5393643 commit 5f245f7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
4 changes: 1 addition & 3 deletions clients/cmd/promtail/promtail-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ clients:
path: /tmp/promtail/log
- kafka:
batch_size: 2097152
url: "172.35.88.54:9092"
url: "192.168.246.85:9092"
producer_max_message_size: 2097152
consumer_fetch_max_size: 2097152


positions:
filename: /tmp/positions.yaml
Expand Down
5 changes: 4 additions & 1 deletion clients/pkg/promtail/client/kafka/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type batch struct {
bytes int
createdAt time.Time
kafkaStreams map[string]*kafkaStream
numEntries int
}

func newBatch(entries ...api.Entry) *batch {
Expand All @@ -133,6 +134,7 @@ func newBatch(entries ...api.Entry) *batch {
kafkaStreams: map[string]*kafkaStream{},
bytes: 0,
createdAt: time.Now(),
numEntries: 0,
}

// Add entries to the batch
Expand All @@ -150,6 +152,7 @@ func (b *batch) add(entry api.Entry) {
return
}
b.bytes += entry.Size()
b.numEntries += 1
// Append the entry to an already existing stream (if any)
labels := labelsMapToString(entry.Labels, ReservedLabelTenantID)
if streams, ok := b.kafkaStreams[labels]; ok {
Expand Down Expand Up @@ -197,7 +200,7 @@ func (b *batch) age() time.Duration {
// the encoded bytes and the number of encoded entries
func (b *batch) encode() ([]*kafka.ProducerMessage, int, error) {
var (
requests []*kafka.ProducerMessage = make([]*kafka.ProducerMessage, len(b.kafkaStreams))
requests []*kafka.ProducerMessage = make([]*kafka.ProducerMessage, 0, b.numEntries)
entriesCount int = 0
)
for _, streams := range b.kafkaStreams {
Expand Down
16 changes: 10 additions & 6 deletions clients/pkg/promtail/client/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const (
HostLabel = "host"
)


type metrics struct {
sendTotalKafkaMessages *prometheus.CounterVec // 已经成功发送的kafka条目
droppedEntry *prometheus.CounterVec //丢弃了多少entry,(非法的entry)
Expand Down Expand Up @@ -109,12 +108,10 @@ func NewKafkaClient(reg prometheus.Registerer, cfg KafkaConfig, logger log.Logge
func newKafkaProducer(cfg *KafkaConfig) (kafka.SyncProducer, error) {
config := kafka.NewConfig()
// 1MB = 1048576 10MB = 10485760
config.Producer.MaxMessageBytes = 10485760
config.Producer.MaxMessageBytes = cfg.ProducerMaxMessageSize
config.Producer.Timeout = cfg.Timeout
config.Producer.Return.Successes = true
config.Producer.Partitioner = kafka.NewRandomPartitioner
config.Consumer.Fetch.Default = 10485760
config.Consumer.Fetch.Max = 10485760
return kafka.NewSyncProducer([]string{cfg.Url}, config)
}

Expand Down Expand Up @@ -144,9 +141,16 @@ func (c *client) run() {
for {
select {
case e, ok := <-c.entries:
if !ok || !c.validateEntry(&e){
if !ok {
return
}

// filter message size
if !c.validateEntry(&e) {
level.Error(c.logger).Log("msg", "message size exceeds limit", "max size: ", c.cfg.ProducerMaxMessageSize, "message size:", len(e.Line))
return
}

// entry is {{labels map}, lines, timestamp}
e, tenantId := c.processEntry(e)
batch, ok := batches[tenantId]
Expand Down Expand Up @@ -243,7 +247,7 @@ func (c *client) send(messages []*kafka.ProducerMessage) (int, error) {
}

// 过滤entry,如果entry line超过最大阀值,日志直接丢弃(只是在kafka端丢弃掉了)
func (c *client)validateEntry(e *api.Entry) bool {
func (c *client) validateEntry(e *api.Entry) bool {
if len(e.Line) > c.cfg.ProducerMaxMessageSize {
return false
}
Expand Down
4 changes: 0 additions & 4 deletions clients/pkg/promtail/client/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ const (
Timeout = 10 * time.Second

ProducerMaxMessageSize int = 10 * 1024 * 1024
ConsumerMaxMessageSize int = 10 * 1024 * 1024
)

type KafkaConfig struct {
Expand All @@ -36,8 +35,6 @@ type KafkaConfig struct {

// The max number of message bytes that can be allowed to send to kafka server
ProducerMaxMessageSize int `yaml:"producer_max_message_size"`
// The max number of message bytes that consumer can fetch from broker
ConsumerFetchMaxSize int `yaml:"consumer_fetch_max_size"`
}

func DefaultKafkaConfig() KafkaConfig {
Expand All @@ -56,6 +53,5 @@ func DefaultKafkaConfig() KafkaConfig {
ExternalLabels: lokiflag.LabelSet{},
Timeout: Timeout,
ProducerMaxMessageSize: ProducerMaxMessageSize,
ConsumerFetchMaxSize: ConsumerMaxMessageSize,
}
}

0 comments on commit 5f245f7

Please sign in to comment.