Skip to content

Commit

Permalink
Refactor the MetricsEngine to make distributed execution easier
Browse files Browse the repository at this point in the history
Not having it depend on lib.TestRunState means that it can be used both by a local test run, and by a coordinator instasnce that isn't actually running the test itself.
  • Loading branch information
na-- committed Jul 13, 2023
1 parent f86938d commit 2da7856
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 85 deletions.
2 changes: 1 addition & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(testState)
me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestPatchStatus(t *testing.T) {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testState)
metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
25 changes: 19 additions & 6 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,28 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
return err
}

executionState := execScheduler.GetState()
metricsEngine, err := engine.NewMetricsEngine(executionState.Test)
metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, logger)
if err != nil {
return err
}
if !testRunState.RuntimeOptions.NoSummary.Bool || !testRunState.RuntimeOptions.NoThresholds.Bool {

// We'll need to pipe metrics to the MetricsEngine and process them if any
// of these are enabled: thresholds, end-of-test summary
shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool ||
!testRunState.RuntimeOptions.NoThresholds.Bool)
var metricsIngester *engine.OutputIngester
if shouldProcessMetrics {
err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool)
if err != nil {
return err
}
// We'll need to pipe metrics to the MetricsEngine if either the
// thresholds or the end-of-test summary are enabled.
outputs = append(outputs, metricsEngine.CreateIngester())
metricsIngester = metricsEngine.CreateIngester()
outputs = append(outputs, metricsIngester)
}

executionState := execScheduler.GetState()
if !testRunState.RuntimeOptions.NoSummary.Bool {
defer func() {
logger.Debug("Generating the end-of-test summary...")
Expand Down Expand Up @@ -208,7 +219,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()

if !testRunState.RuntimeOptions.NoThresholds.Bool {
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration)
finalizeThresholds := metricsEngine.StartThresholdCalculations(
metricsIngester, runAbort, executionState.GetCurrentTestRunDuration,
)
handleFinalThresholdCalculation := func() {
// This gets called after the Samples channel has been closed and
// the OutputManager has flushed all of the cached samples to
Expand Down Expand Up @@ -283,7 +296,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}

printExecutionDescription(
c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs,
c.gs, "local", args[0], "", conf, executionState.ExecutionTuple, executionPlan, outputs,
)

// Trap Interrupts, SIGINTs and SIGTERMs.
Expand Down
9 changes: 1 addition & 8 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"go.k6.io/k6/lib/testutils/mockoutput"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/output"
)

Expand Down Expand Up @@ -387,23 +386,17 @@ func TestDataIsolation(t *testing.T) {
execScheduler, err := execution.NewScheduler(testRunState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(testRunState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
defer globalCancel()
runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger)

mockOutput := mockoutput.New()
outputManager := output.NewManager([]output.Output{mockOutput, metricsEngine.CreateIngester()}, testRunState.Logger, runAbort)
outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, runAbort)
samples := make(chan metrics.SampleContainer, 1000)
waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples)
require.NoError(t, err)
defer stopOutputs(nil)

finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration)
require.Nil(t, finalizeThresholds)

require.Empty(t, runner.defaultGroup.Groups)

stopEmission, err := execScheduler.Init(runCtx, samples)
Expand Down
48 changes: 20 additions & 28 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.k6.io/k6/errext/exitcodes"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/output"
"gopkg.in/guregu/null.v3"
)

Expand All @@ -25,13 +24,11 @@ const thresholdsRate = 2 * time.Second
// aggregated metric sample values. They are used to generate the end-of-test
// summary and to evaluate the test thresholds.
type MetricsEngine struct {
logger logrus.FieldLogger
test *lib.TestRunState
outputIngester *outputIngester
registry *metrics.Registry
logger logrus.FieldLogger

// These can be both top-level metrics or sub-metrics
metricsWithThresholds []*metrics.Metric

metricsWithThresholds []*metrics.Metric
breachedThresholdsCount uint32

// TODO: completely refactor:
Expand All @@ -44,39 +41,31 @@ type MetricsEngine struct {
}

// NewMetricsEngine creates a new metrics Engine with the given parameters.
func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
func NewMetricsEngine(registry *metrics.Registry, logger logrus.FieldLogger) (*MetricsEngine, error) {
me := &MetricsEngine{
test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
registry: registry,
logger: logger.WithField("component", "metrics-engine"),
ObservedMetrics: make(map[string]*metrics.Metric),
}

if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) {
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
}
}

return me, nil
}

// CreateIngester returns a pseudo-Output that uses the given metric samples to
// update the engine's inner state.
func (me *MetricsEngine) CreateIngester() output.Output {
me.outputIngester = &outputIngester{
func (me *MetricsEngine) CreateIngester() *OutputIngester {
return &OutputIngester{
logger: me.logger.WithField("component", "metrics-engine-ingester"),
metricsEngine: me,
cardinality: newCardinalityControl(),
}
return me.outputIngester
}

func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Metric, error) {
// TODO: replace with strings.Cut after Go 1.18
nameParts := strings.SplitN(name, "{", 2)

metric := me.test.Registry.Get(nameParts[0])
metric := me.registry.Get(nameParts[0])
if metric == nil {
return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0])
}
Expand Down Expand Up @@ -125,11 +114,14 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) {
}
}

func (me *MetricsEngine) initSubMetricsAndThresholds() error {
for metricName, thresholds := range me.test.Options.Thresholds {
// InitSubMetricsAndThresholds parses the thresholds from the test Options and
// initializes both the thresholds themselves, as well as any submetrics that
// were referenced in them.
func (me *MetricsEngine) InitSubMetricsAndThresholds(options lib.Options, onlyLogErrors bool) error {
for metricName, thresholds := range options.Thresholds {
metric, err := me.getThresholdMetricOrSubmetric(metricName)

if me.test.RuntimeOptions.NoThresholds.Bool {
if onlyLogErrors {
if err != nil {
me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName)
}
Expand All @@ -154,7 +146,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {

// TODO: refactor out of here when https://github.com/grafana/k6/issues/1321
// lands and there is a better way to enable a metric with tag
if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) {
if options.SystemTags.Has(metrics.TagExpectedResponse) {
_, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}")
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
Expand All @@ -167,10 +159,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {
// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(
ingester *OutputIngester,
abortRun func(error),
getCurrentTestRunDuration func() time.Duration,
) (finalize func() (breached []string),
) {
) (finalize func() (breached []string)) {
if len(me.metricsWithThresholds) == 0 {
return nil // no thresholds were defined
}
Expand Down Expand Up @@ -205,9 +197,9 @@ func (me *MetricsEngine) StartThresholdCalculations(
}()

return func() []string {
if me.outputIngester != nil {
if ingester != nil {
// Stop the ingester so we don't get any more metrics
err := me.outputIngester.Stop()
err := ingester.Stop()
if err != nil {
me.logger.WithError(err).Warnf("There was a problem stopping the output ingester.")
}
Expand Down
40 changes: 13 additions & 27 deletions metrics/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ func TestNewMetricsEngineWithThresholds(t *testing.T) {
_, err = trs.Registry.NewMetric("metric2", metrics.Counter)
require.NoError(t, err)

me, err := NewMetricsEngine(trs)
me, err := NewMetricsEngine(trs.Registry, trs.Logger)
require.NoError(t, err)
require.NotNil(t, me)

require.NoError(t, me.InitSubMetricsAndThresholds(trs.Options, false))

assert.Len(t, me.metricsWithThresholds, 2)
}

Expand All @@ -57,7 +59,7 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) {
t.Parallel()

me := newTestMetricsEngine(t)
_, err := me.test.Registry.NewMetric("metric1", metrics.Counter)
_, err := me.registry.NewMetric("metric1", metrics.Counter)
require.NoError(t, err)

_, err = me.getThresholdMetricOrSubmetric(tc.metricDefinition)
Expand All @@ -69,16 +71,8 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) {
func TestNewMetricsEngineNoThresholds(t *testing.T) {
t.Parallel()

trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: testutils.NewLogger(t),
},
}

me, err := NewMetricsEngine(trs)
require.NoError(t, err)
me := newTestMetricsEngine(t)
require.NotNil(t, me)

assert.Empty(t, me.metricsWithThresholds)
}

Expand Down Expand Up @@ -113,9 +107,9 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) {
t.Parallel()
me := newTestMetricsEngine(t)

m1, err := me.test.Registry.NewMetric("m1", metrics.Counter)
m1, err := me.registry.NewMetric("m1", metrics.Counter)
require.NoError(t, err)
m2, err := me.test.Registry.NewMetric("m2", metrics.Counter)
m2, err := me.registry.NewMetric("m2", metrics.Counter)
require.NoError(t, err)

ths := metrics.NewThresholds([]string{tc.threshold})
Expand All @@ -138,9 +132,9 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {

me := newTestMetricsEngine(t)

m1, err := me.test.Registry.NewMetric("m1", metrics.Counter)
m1, err := me.registry.NewMetric("m1", metrics.Counter)
require.NoError(t, err)
m2, err := me.test.Registry.NewMetric("m2", metrics.Counter)
m2, err := me.registry.NewMetric("m2", metrics.Counter)
require.NoError(t, err)

ths := metrics.NewThresholds([]string{"count>5"})
Expand All @@ -159,18 +153,10 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) {
assert.Empty(t, breached)
}

func newTestMetricsEngine(t *testing.T) MetricsEngine {
trs := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: testutils.NewLogger(t),
Registry: metrics.NewRegistry(),
},
}

return MetricsEngine{
logger: trs.Logger,
test: trs,
}
func newTestMetricsEngine(t *testing.T) *MetricsEngine {
m, err := NewMetricsEngine(metrics.NewRegistry(), testutils.NewLogger(t))
require.NoError(t, err)
return m
}

func zeroTestRunDuration() time.Duration {
Expand Down
14 changes: 7 additions & 7 deletions metrics/engine/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ const (
timeSeriesFirstLimit = 100_000
)

var _ output.Output = &outputIngester{}
var _ output.Output = &OutputIngester{}

// IngesterDescription is a short description for ingester.
// This variable is used from a function in cmd/ui file for matching this output
// and print a special text.
const IngesterDescription = "Internal Metrics Ingester"

// outputIngester implements the output.Output interface and can be used to
// OutputIngester implements the output.Output interface and can be used to
// "feed" the MetricsEngine data from a `k6 run` test run.
type outputIngester struct {
type OutputIngester struct {
output.SampleBuffer
logger logrus.FieldLogger

Expand All @@ -32,12 +32,12 @@ type outputIngester struct {
}

// Description returns a human-readable description of the output.
func (oi *outputIngester) Description() string {
func (oi *OutputIngester) Description() string {
return IngesterDescription
}

// Start the engine by initializing a new output.PeriodicFlusher
func (oi *outputIngester) Start() error {
func (oi *OutputIngester) Start() error {
oi.logger.Debug("Starting...")

pf, err := output.NewPeriodicFlusher(collectRate, oi.flushMetrics)
Expand All @@ -51,15 +51,15 @@ func (oi *outputIngester) Start() error {
}

// Stop flushes any remaining metrics and stops the goroutine.
func (oi *outputIngester) Stop() error {
func (oi *OutputIngester) Stop() error {
oi.logger.Debug("Stopping...")
defer oi.logger.Debug("Stopped!")
oi.periodicFlusher.Stop()
return nil
}

// flushMetrics Writes samples to the MetricsEngine
func (oi *outputIngester) flushMetrics() {
func (oi *OutputIngester) flushMetrics() {
sampleContainers := oi.GetBufferedSamples()
if len(sampleContainers) == 0 {
return
Expand Down
Loading

0 comments on commit 2da7856

Please sign in to comment.