Skip to content

Commit

Permalink
implementing all the functions for the counter and gauge metric types
Browse files Browse the repository at this point in the history
metric test is still failing because of missing metrics when the counter is 0
added a lot of TODO's I need to go back and cleanup
  • Loading branch information
slim-bean authored and Ed committed May 31, 2019
1 parent eaecf0f commit 29eaec1
Show file tree
Hide file tree
Showing 17 changed files with 370 additions and 67 deletions.
51 changes: 48 additions & 3 deletions pkg/logentry/metric/counters.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,70 @@
package metric

import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

// Counters is a vector of counters for a each log stream.
const (
CounterInc = "inc"
CounterAdd = "add"

ErrCounterActionRequired = "counter action must be defined as either `inc` or `add`"
ErrCounterInvalidAction = "action %s is not valid, action must be either `inc` or `add`"
)

type CounterConfig struct {
Value *string `mapstructure:"value"`
Action string `mapstructure:"action"`
}

func validateCounterConfig(config *CounterConfig) error {
if config.Action == "" {
return errors.New(ErrCounterActionRequired)
}
if config.Action != CounterInc && config.Action != CounterAdd {
return errors.Errorf(ErrCounterInvalidAction, config.Action)
}
return nil
}

func parseCounterConfig(config interface{}) (*CounterConfig, error) {
cfg := &CounterConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}

// Counters is a vec tor of counters for a each log stream.
type Counters struct {
*metricVec
Cfg *CounterConfig
}

// NewCounters creates a new counter vec.
func NewCounters(name, help string) *Counters {
func NewCounters(name, help string, config interface{}) (*Counters, error) {
cfg, err := parseCounterConfig(config)
if err != nil {
return nil, err
}
err = validateCounterConfig(cfg)
if err != nil {
return nil, err
}
return &Counters{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewCounter(prometheus.CounterOpts{
Help: help,
Name: name,
ConstLabels: labels,
})
})}
}),
Cfg: cfg,
}, nil
}

// With returns the counter associated with a stream labelset.
Expand Down
19 changes: 16 additions & 3 deletions pkg/logentry/metric/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ package metric
import (
"time"

"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/promtail/api"
)

// LogCount counts log line for each stream.
func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler {
c := NewCounters("log_entries_total", "the total count of log entries")
cfg := CounterConfig{
Action: CounterInc,
}
c, err := NewCounters("log_entries_total", "the total count of log entries", cfg)
if err != nil {
//TODO what do we want to do with this??
}
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
if err := next.Handle(labels, time, entry); err != nil {
Expand All @@ -23,7 +30,13 @@ func LogCount(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler

// LogSize observes log line size for each stream.
func LogSize(reg prometheus.Registerer, next api.EntryHandler) api.EntryHandler {
c := NewHistograms("log_entries_bytes", "the total count of bytes", prometheus.ExponentialBuckets(16, 2, 8))
cfg := HistogramConfig{
Buckets: prometheus.ExponentialBuckets(16, 2, 8),
}
c, err := NewHistograms("log_entries_bytes", "the total count of bytes", cfg)
if err != nil {
//TODO what do we want to do with this??
}
reg.MustRegister(c)
return api.EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
if err := next.Handle(labels, time, entry); err != nil {
Expand Down
56 changes: 54 additions & 2 deletions pkg/logentry/metric/gauges.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,77 @@
package metric

import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

const (
GaugeSet = "set"
GaugeInc = "inc"
GaugeDec = "dec"
GaugeAdd = "add"
GaugeSub = "sub"

ErrGaugeActionRequired = "gauge action must be defined as `set`, `inc`, `dec`, `add`, or `sub`"
ErrGaugeInvalidAction = "action %s is not valid, action must be `set`, `inc`, `dec`, `add`, or `sub`"
)

type GaugeConfig struct {
Value *string `mapstructure:"value"`
Action string `mapstructure:"action"`
}

func validateGaugeConfig(config *GaugeConfig) error {
if config.Action == "" {
return errors.New(ErrGaugeActionRequired)
}
if config.Action != GaugeSet &&
config.Action != GaugeInc &&
config.Action != GaugeDec &&
config.Action != GaugeAdd &&
config.Action != GaugeSub {
return errors.Errorf(ErrGaugeInvalidAction, config.Action)
}
return nil
}

func parseGaugeConfig(config interface{}) (*GaugeConfig, error) {
cfg := &GaugeConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}

// Gauges is a vector of gauges for a each log stream.
type Gauges struct {
*metricVec
Cfg *GaugeConfig
}

// NewGauges creates a new gauge vec.
func NewGauges(name, help string) *Gauges {
func NewGauges(name, help string, config interface{}) (*Gauges, error) {
cfg, err := parseGaugeConfig(config)
if err != nil {
return nil, err
}
err = validateGaugeConfig(cfg)
if err != nil {
return nil, err
}
return &Gauges{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewGauge(prometheus.GaugeOpts{
Help: help,
Name: name,
ConstLabels: labels,
})
})}
}),
Cfg: cfg,
}, nil
}

// With returns the gauge associated with a stream labelset.
Expand Down
37 changes: 34 additions & 3 deletions pkg/logentry/metric/histograms.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,57 @@
package metric

import (
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

type HistogramConfig struct {
Value *string `mapstructure:"value"`
Buckets []float64 `mapstructure:"buckets"`
}

func validateHistogramConfig(config *HistogramConfig) error {
//TODO is there any validation required?
return nil
}

func parseHistogramConfig(config interface{}) (*HistogramConfig, error) {
cfg := &HistogramConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}

// Histograms is a vector of histograms for a each log stream.
type Histograms struct {
*metricVec
Cfg *HistogramConfig
}

// NewHistograms creates a new histogram vec.
func NewHistograms(name, help string, buckets []float64) *Histograms {
func NewHistograms(name, help string, config interface{}) (*Histograms, error) {
cfg, err := parseHistogramConfig(config)
if err != nil {
return nil, err
}
err = validateHistogramConfig(cfg)
if err != nil {
return nil, err
}
return &Histograms{
metricVec: newMetricVec(func(labels map[string]string) prometheus.Metric {
return prometheus.NewHistogram(prometheus.HistogramOpts{
Help: help,
Name: name,
ConstLabels: labels,
Buckets: buckets,
Buckets: cfg.Buckets,
})
})}
}),
Cfg: cfg,
}, nil
}

// With returns the histogram associated with a stream labelset.
Expand Down
9 changes: 5 additions & 4 deletions pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package stages

import (
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const RFC3339Nano = "RFC3339Nano"

// NewDocker creates a Docker json log format specific pipeline stage.
func NewDocker(logger log.Logger, jobName string) (Stage, error) {
func NewDocker(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
t := "timestamp"
f := RFC3339Nano
o := "output"
Expand Down Expand Up @@ -35,11 +36,11 @@ func NewDocker(logger log.Logger, jobName string) (Stage, error) {
},
}}

return NewPipeline(logger, stages, jobName+"_docker")
return NewPipeline(logger, stages, jobName+"_docker", registerer)
}

// NewCRI creates a CRI format specific pipeline stage
func NewCRI(logger log.Logger, jobName string) (Stage, error) {
func NewCRI(logger log.Logger, jobName string, registerer prometheus.Registerer) (Stage, error) {
t := "time"
f := RFC3339Nano
o := "content"
Expand All @@ -66,5 +67,5 @@ func NewCRI(logger log.Logger, jobName string) (Stage, error) {
},
},
}
return NewPipeline(logger, stages, jobName+"_cri")
return NewPipeline(logger, stages, jobName+"_cri", registerer)
}
5 changes: 3 additions & 2 deletions pkg/logentry/stages/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -64,7 +65,7 @@ func TestNewDocker(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewDocker(util.Logger, "test")
p, err := NewDocker(util.Logger, "test", prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create Docker parser: %s", err)
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestNewCri(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := NewCRI(util.Logger, "test")
p, err := NewCRI(util.Logger, "test", prometheus.DefaultRegisterer)
if err != nil {
t.Fatalf("failed to create CRI parser: %s", err)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -30,9 +31,11 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
if cfg == nil {
return nil, errors.New(ErrEmptyMatchStageConfig)
}
//FIXME PipelineName does not need to be a pointer
if cfg.PipelineName == nil || *cfg.PipelineName == "" {
return nil, errors.New(ErrPipelineNameRequired)
}
//FIXME selector does not need to be a pointer
if cfg.Selector == nil || *cfg.Selector == "" {
return nil, errors.New(ErrSelectorRequired)
}
Expand All @@ -46,7 +49,7 @@ func validateMatcherConfig(cfg *MatcherConfig) ([]*labels.Matcher, error) {
return matchers, nil
}

func newMatcherStage(logger log.Logger, config interface{}) (Stage, error) {
func newMatcherStage(logger log.Logger, config interface{}, registerer prometheus.Registerer) (Stage, error) {
cfg := &MatcherConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
Expand All @@ -57,7 +60,7 @@ func newMatcherStage(logger log.Logger, config interface{}) (Stage, error) {
return nil, err
}

pl, err := NewPipeline(logger, cfg.Stages, *cfg.PipelineName)
pl, err := NewPipeline(logger, cfg.Stages, *cfg.PipelineName, registerer)
if err != nil {
return nil, errors.Wrapf(err, "match stage %s failed to create pipeline", *cfg.PipelineName)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/logentry/stages/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/client_golang/prometheus"
)

var pipelineName = "pl_name"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestMatcher(t *testing.T) {
},
},
}
s, err := newMatcherStage(util.Logger, matchConfig)
s, err := newMatcherStage(util.Logger, matchConfig, prometheus.DefaultRegisterer)
if (err != nil) != tt.wantErr {
t.Errorf("withMatcher() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
Loading

0 comments on commit 29eaec1

Please sign in to comment.