Skip to content

Commit

Permalink
Batch uploader: added batch_uploader.threads_count setting to process…
Browse files Browse the repository at this point in the history
… batches in parallel
  • Loading branch information
absorbb committed Sep 30, 2022
1 parent c52f3ba commit 5c89db0
Showing 5 changed files with 225 additions and 170 deletions.
2 changes: 2 additions & 0 deletions server/appconfig/appconfig.go
Original file line number Diff line number Diff line change
@@ -107,6 +107,8 @@ func setDefaultParams(containerized bool) {
viper.SetDefault("log.pool.size", 10)
viper.SetDefault("log.rotation_min", 5)

viper.SetDefault("batch_uploader.threads_count", 1)

viper.SetDefault("sql_debug_log.ddl.enabled", true)
viper.SetDefault("sql_debug_log.ddl.rotation_min", "1440")
viper.SetDefault("sql_debug_log.ddl.max_backups", "365") //1 year = 1440 min * 365
18 changes: 9 additions & 9 deletions server/fallback/service.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ const (

var destinationIDExtractRegexp = regexp.MustCompile("failed.dst=(.*)-\\d\\d\\d\\d-\\d\\d-\\d\\dT")

//Service stores and processes fallback files
// Service stores and processes fallback files
type Service struct {
fallbackDir string
fileMask string
@@ -42,12 +42,12 @@ type Service struct {
locks sync.Map
}

//NewTestService returns test instance - only for tests
// NewTestService returns test instance - only for tests
func NewTestService() *Service {
return &Service{}
}

//NewService returns configured Service
// NewService returns configured Service
func NewService(logEventsPath string, destinationService *destinations.Service, usersRecognition events.Recognition) (*Service, error) {
fallbackPath := path.Join(logEventsPath, logevents.FailedDir)
logArchiveEventPath := path.Join(logEventsPath, logevents.ArchiveDir)
@@ -65,7 +65,7 @@ func NewService(logEventsPath string, destinationService *destinations.Service,
}, nil
}

//Replay processes fallback file (or plain file) and store it in the destination
// Replay processes fallback file (or plain file) and store it in the destination
func (s *Service) Replay(fileName, destinationID string, rawFile, skipMalformed bool) error {
if fileName == "" {
return errors.New("File name can't be empty")
@@ -147,7 +147,7 @@ func (s *Service) Replay(fileName, destinationID string, rawFile, skipMalformed
return nil
}

//GetFileStatuses returns all fallback files with their statuses
// GetFileStatuses returns all fallback files with their statuses
func (s *Service) GetFileStatuses(destinationsFilter map[string]bool) []*FileStatus {
files, err := filepath.Glob(s.fileMask)
if err != nil {
@@ -197,8 +197,8 @@ func (s *Service) GetFileStatuses(destinationsFilter map[string]bool) []*FileSta
return fileStatuses
}

//readFileBytes reads file from the file system and returns byte payload or err if occurred
//does unzip if file has been compressed
// readFileBytes reads file from the file system and returns byte payload or err if occurred
// does unzip if file has been compressed
func (s *Service) readFileBytes(filePath string) ([]byte, error) {
b, err := ioutil.ReadFile(filePath)
if err != nil {
@@ -223,7 +223,7 @@ func (s *Service) readFileBytes(filePath string) ([]byte, error) {
return resB.Bytes(), nil
}

//ExtractEvents parses input bytes as plain jsons or fallback jsons or fallback jsons with skipping malformed objects
// ExtractEvents parses input bytes as plain jsons or fallback jsons or fallback jsons with skipping malformed objects
func ExtractEvents(b []byte, rawFile, skipMalformed bool) ([]map[string]interface{}, error) {
var objects []map[string]interface{}
var err error
@@ -234,7 +234,7 @@ func ExtractEvents(b []byte, rawFile, skipMalformed bool) ([]map[string]interfac
} else {
if skipMalformed {
//ignore parsing errors
objects, parseErrors, err = parsers.ParseJSONFileWithFuncFallback(b, events.ParseFallbackJSON)
objects, parseErrors, err = parsers.ParseJSONBytesWithFuncFallback(b, events.ParseFallbackJSON)
} else {
objects, err = parsers.ParseJSONFileWithFunc(b, events.ParseFallbackJSON)
}
Loading
Oops, something went wrong.

0 comments on commit 5c89db0

Please sign in to comment.