Skip to content

Commit

Permalink
fix bug graceful shutdown stuck if kafka topic is not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
Artem Chekunov committed Jun 22, 2020
1 parent 8f3a566 commit 0c10108
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

// TaskService holds the configuration for each task
type Service struct {
started bool
stopped chan struct{}
kafka *input.Kafka
clickhouse *output.ClickHouse
Expand All @@ -52,6 +53,7 @@ func NewTaskService(kafka *input.Kafka, clickhouse *output.ClickHouse, p parser.
clickhouse: clickhouse,
p: p,
Name: kafka.Name,
started: false,
}
}

Expand All @@ -70,6 +72,7 @@ func (service *Service) Run() {
if err := service.kafka.Start(); err != nil {
panic(err)
}
service.started = true

log.Infof("TaskService %s TaskService has started", service.clickhouse.GetName())
tick := time.NewTicker(time.Duration(service.FlushInterval) * time.Second)
Expand Down Expand Up @@ -118,13 +121,17 @@ func (service *Service) flush(metrics []model.Metric) {
statistics.UpdateFlushTimespan(service.Name, start)
}

// Stop stop kafak and clickhouse client
// Stop stop kafka and clickhouse client
func (service *Service) Stop() {
log.Infof("%s: close TaskService", service.clickhouse.GetName())
if err := service.kafka.Stop(); err != nil {
panic(err)
}
<-service.stopped

if service.started {
<-service.stopped
}

_ = service.clickhouse.Close()
log.Infof("%s: closed TaskService", service.clickhouse.GetName())
}
Expand Down

0 comments on commit 0c10108

Please sign in to comment.