Skip to content

Commit

Permalink
Rewrite csv collector as an output (grafana#1948)
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov authored Apr 21, 2021
1 parent 5dc1187 commit f100af4
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 294 deletions.
19 changes: 4 additions & 15 deletions cmd/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import (
"github.com/loadimpact/k6/loader"
"github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/output/cloud"
"github.com/loadimpact/k6/output/csv"
"github.com/loadimpact/k6/output/influxdb"
"github.com/loadimpact/k6/output/json"
"github.com/loadimpact/k6/output/statsd"
"github.com/loadimpact/k6/stats"
"github.com/loadimpact/k6/stats/csv"

"github.com/k6io/xk6-output-kafka/pkg/kafka"
)
Expand All @@ -47,9 +47,8 @@ import (
func getAllOutputConstructors() (map[string]func(output.Params) (output.Output, error), error) {
// Start with the built-in outputs
result := map[string]func(output.Params) (output.Output, error){
"json": json.New,
"cloud": cloud.New,

"json": json.New,
"cloud": cloud.New,
"influxdb": influxdb.New,
"kafka": func(params output.Params) (output.Output, error) {
params.Logger.Warn("The kafka output is deprecated, and will be removed in a future k6 version. " +
Expand All @@ -63,17 +62,7 @@ func getAllOutputConstructors() (map[string]func(output.Params) (output.Output,
"Please use the statsd output with env variable K6_STATSD_ENABLE_TAGS=true instead.")
return statsd.NewDatadog(params)
},
"csv": func(params output.Params) (output.Output, error) {
conf, err := csv.GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
if err != nil {
return nil, err
}
csvc, err := csv.New(params.Logger, params.FS, params.ScriptOptions.SystemTags.Map(), conf)
if err != nil {
return nil, err
}
return newCollectorAdapter(params, csvc), nil
},
"csv": csv.New,
}

exts := output.GetExtensions()
Expand Down
2 changes: 1 addition & 1 deletion stats/csv/config.go → output/csv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/loadimpact/k6/lib/types"
)

// Config is the config for the csv collector
// Config is the config for the csv output
type Config struct {
// Samples.
FileName null.String `json:"file_name" envconfig:"K6_CSV_FILENAME"`
Expand Down
File renamed without changes.
142 changes: 73 additions & 69 deletions stats/csv/collector.go → output/csv/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package csv
import (
"bytes"
"compress/gzip"
"context"
"encoding/csv"
"fmt"
"os"
Expand All @@ -33,50 +32,65 @@ import (
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/afero"

"github.com/loadimpact/k6/lib"
"github.com/loadimpact/k6/output"
"github.com/loadimpact/k6/stats"
)

// Collector saving output to csv implements the lib.Collector interface
type Collector struct {
closeFn func() error
fname string
// Output implements the lib.Output interface for saving to CSV files.
type Output struct {
output.SampleBuffer

params output.Params
periodicFlusher *output.PeriodicFlusher

logger logrus.FieldLogger
fname string
csvWriter *csv.Writer
csvLock sync.Mutex
closeFn func() error

resTags []string
ignoredTags []string
csvWriter *csv.Writer
csvLock sync.Mutex
buffer []stats.Sample
bufferLock sync.Mutex
row []string
saveInterval time.Duration
logger logrus.FieldLogger
}

// Verify that Collector implements lib.Collector
var _ lib.Collector = &Collector{}
// New Creates new instance of CSV output
func New(params output.Params) (output.Output, error) {
return newOutput(params)
}

// New Creates new instance of CSV collector
func New(logger logrus.FieldLogger, fs afero.Fs, tags stats.TagSet, config Config) (*Collector, error) {
func newOutput(params output.Params) (*Output, error) {
resTags := []string{}
ignoredTags := []string{}
tags := params.ScriptOptions.SystemTags.Map()
for tag, flag := range tags {
if flag {
resTags = append(resTags, tag)
} else {
ignoredTags = append(ignoredTags, tag)
}
}

sort.Strings(resTags)
sort.Strings(ignoredTags)

config, err := GetConsolidatedConfig(params.JSONConfig, params.Environment, params.ConfigArgument)
if err != nil {
return nil, err
}

saveInterval := time.Duration(config.SaveInterval.Duration)
fname := config.FileName.String

logger := params.Logger.WithFields(logrus.Fields{
"output": "csv",
"filename": params.ConfigArgument,
})
if fname == "" || fname == "-" {
stdoutWriter := csv.NewWriter(os.Stdout)
return &Collector{
return &Output{
fname: "-",
resTags: resTags,
ignoredTags: ignoredTags,
Expand All @@ -85,21 +99,23 @@ func New(logger logrus.FieldLogger, fs afero.Fs, tags stats.TagSet, config Confi
saveInterval: saveInterval,
closeFn: func() error { return nil },
logger: logger,
params: params,
}, nil
}

logFile, err := fs.Create(fname)
logFile, err := params.FS.Create(fname)
if err != nil {
return nil, err
}

c := Collector{
c := Output{
fname: fname,
resTags: resTags,
ignoredTags: ignoredTags,
row: make([]string, 3+len(resTags)+1),
saveInterval: saveInterval,
logger: logger,
params: params,
}

if strings.HasSuffix(fname, ".gz") {
Expand All @@ -119,76 +135,64 @@ func New(logger logrus.FieldLogger, fs afero.Fs, tags stats.TagSet, config Confi
return &c, nil
}

// Init writes column names to csv file
func (c *Collector) Init() error {
header := MakeHeader(c.resTags)
err := c.csvWriter.Write(header)
if err != nil {
c.logger.WithField("filename", c.fname).Error("CSV: Error writing column names to file")
// Description returns a human-readable description of the output.
func (o *Output) Description() string {
if o.fname == "" || o.fname == "-" { // TODO rename
return "csv (stdout)"
}
c.csvWriter.Flush()
return nil
return fmt.Sprintf("csv (%s)", o.fname)
}

// Run just blocks until the context is done
func (c *Collector) Run(ctx context.Context) {
ticker := time.NewTicker(c.saveInterval)
defer func() {
err := c.closeFn()
if err != nil {
c.logger.WithField("filename", c.fname).Errorf("CSV: Error closing the file: %v", err)
}
}()

for {
select {
case <-ticker.C:
c.writeToFile()
case <-ctx.Done():
c.writeToFile()
return
}
// Start writes the csv header and starts a new output.PeriodicFlusher
func (o *Output) Start() error {
o.logger.Debug("Starting...")

header := MakeHeader(o.resTags)
err := o.csvWriter.Write(header)
if err != nil {
o.logger.WithField("filename", o.fname).Error("CSV: Error writing column names to file")
}
}
o.csvWriter.Flush()

// Collect Saves samples to buffer
func (c *Collector) Collect(scs []stats.SampleContainer) {
c.bufferLock.Lock()
defer c.bufferLock.Unlock()
for _, sc := range scs {
c.buffer = append(c.buffer, sc.GetSamples()...)
pf, err := output.NewPeriodicFlusher(o.saveInterval, o.flushMetrics)
if err != nil {
return err
}
o.logger.Debug("Started!")
o.periodicFlusher = pf

return nil
}

// Stop flushes any remaining metrics and stops the goroutine.
func (o *Output) Stop() error {
o.logger.Debug("Stopping...")
defer o.logger.Debug("Stopped!")
o.periodicFlusher.Stop()
return o.closeFn()
}

// writeToFile Writes samples to the csv file
func (c *Collector) writeToFile() {
c.bufferLock.Lock()
samples := c.buffer
c.buffer = nil
c.bufferLock.Unlock()
// flushMetrics Writes samples to the csv file
func (o *Output) flushMetrics() {
samples := o.GetBufferedSamples()

if len(samples) > 0 {
c.csvLock.Lock()
defer c.csvLock.Unlock()
o.csvLock.Lock()
defer o.csvLock.Unlock()
for _, sc := range samples {
for _, sample := range sc.GetSamples() {
sample := sample
row := SampleToRow(&sample, c.resTags, c.ignoredTags, c.row)
err := c.csvWriter.Write(row)
row := SampleToRow(&sample, o.resTags, o.ignoredTags, o.row)
err := o.csvWriter.Write(row)
if err != nil {
c.logger.WithField("filename", c.fname).Error("CSV: Error writing to file")
o.logger.WithField("filename", o.fname).Error("CSV: Error writing to file")
}
}
}
c.csvWriter.Flush()
o.csvWriter.Flush()
}
}

// Link returns a dummy string, it's only included to satisfy the lib.Collector interface
func (c *Collector) Link() string {
return c.fname
}

// MakeHeader creates list of column names for csv file
func MakeHeader(tags []string) []string {
tags = append(tags, "extra_tags")
Expand Down
Loading

0 comments on commit f100af4

Please sign in to comment.