diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 1a8f5eed8df..9893d87bfe4 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -44,7 +44,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa execScheduler, err := execution.NewScheduler(testState) require.NoError(tb, err) - me, err := engine.NewMetricsEngine(testState) + me, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(tb, err) ctx, cancel := context.WithCancel(context.Background()) diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 790b8e14647..65917e0a8ab 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) { execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testState) + metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 89365b23d48..f74f0ed5a20 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -115,7 +115,7 @@ func TestPatchStatus(t *testing.T) { execScheduler, err := execution.NewScheduler(testState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testState) + metricsEngine, err := engine.NewMetricsEngine(testState.Registry, testState.Logger) require.NoError(t, err) globalCtx, globalCancel := context.WithCancel(context.Background()) diff --git a/cmd/run.go b/cmd/run.go index cd69ed043a7..2c640513c17 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -147,17 +147,28 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { return err } - executionState := execScheduler.GetState() - metricsEngine, err := engine.NewMetricsEngine(executionState.Test) + metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, logger) if err != nil { return err } - if !testRunState.RuntimeOptions.NoSummary.Bool || !testRunState.RuntimeOptions.NoThresholds.Bool { + + // We'll need to pipe metrics to the MetricsEngine and process them if any + // of these are enabled: thresholds, end-of-test summary + shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool || + !testRunState.RuntimeOptions.NoThresholds.Bool) + var metricsIngester *engine.OutputIngester + if shouldProcessMetrics { + err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool) + if err != nil { + return err + } // We'll need to pipe metrics to the MetricsEngine if either the // thresholds or the end-of-test summary are enabled. - outputs = append(outputs, metricsEngine.CreateIngester()) + metricsIngester = metricsEngine.CreateIngester() + outputs = append(outputs, metricsIngester) } + executionState := execScheduler.GetState() if !testRunState.RuntimeOptions.NoSummary.Bool { defer func() { logger.Debug("Generating the end-of-test summary...") @@ -208,7 +219,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { }() if !testRunState.RuntimeOptions.NoThresholds.Bool { - finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration) + finalizeThresholds := metricsEngine.StartThresholdCalculations( + metricsIngester, runAbort, executionState.GetCurrentTestRunDuration, + ) handleFinalThresholdCalculation := func() { // This gets called after the Samples channel has been closed and // the OutputManager has flushed all of the cached samples to @@ -283,7 +296,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { } printExecutionDescription( - c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, + c.gs, "local", args[0], "", conf, executionState.ExecutionTuple, executionPlan, outputs, ) // Trap Interrupts, SIGINTs and SIGTERMs. diff --git a/js/runner_test.go b/js/runner_test.go index ed72b9828d3..4b34dcd1641 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -47,7 +47,6 @@ import ( "go.k6.io/k6/lib/testutils/mockoutput" "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" - "go.k6.io/k6/metrics/engine" "go.k6.io/k6/output" ) @@ -387,23 +386,17 @@ func TestDataIsolation(t *testing.T) { execScheduler, err := execution.NewScheduler(testRunState) require.NoError(t, err) - metricsEngine, err := engine.NewMetricsEngine(testRunState) - require.NoError(t, err) - globalCtx, globalCancel := context.WithCancel(context.Background()) defer globalCancel() runCtx, runAbort := execution.NewTestRunContext(globalCtx, testRunState.Logger) mockOutput := mockoutput.New() - outputManager := output.NewManager([]output.Output{mockOutput, metricsEngine.CreateIngester()}, testRunState.Logger, runAbort) + outputManager := output.NewManager([]output.Output{mockOutput}, testRunState.Logger, runAbort) samples := make(chan metrics.SampleContainer, 1000) waitForMetricsFlushed, stopOutputs, err := outputManager.Start(samples) require.NoError(t, err) defer stopOutputs(nil) - finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration) - require.Nil(t, finalizeThresholds) - require.Empty(t, runner.defaultGroup.Groups) stopEmission, err := execScheduler.Init(runCtx, samples) diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 8aa81c11fad..55f0b87fdb7 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -15,7 +15,6 @@ import ( "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" - "go.k6.io/k6/output" "gopkg.in/guregu/null.v3" ) @@ -25,13 +24,11 @@ const thresholdsRate = 2 * time.Second // aggregated metric sample values. They are used to generate the end-of-test // summary and to evaluate the test thresholds. type MetricsEngine struct { - logger logrus.FieldLogger - test *lib.TestRunState - outputIngester *outputIngester + registry *metrics.Registry + logger logrus.FieldLogger // These can be both top-level metrics or sub-metrics - metricsWithThresholds []*metrics.Metric - + metricsWithThresholds []*metrics.Metric breachedThresholdsCount uint32 // TODO: completely refactor: @@ -44,39 +41,31 @@ type MetricsEngine struct { } // NewMetricsEngine creates a new metrics Engine with the given parameters. -func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) { +func NewMetricsEngine(registry *metrics.Registry, logger logrus.FieldLogger) (*MetricsEngine, error) { me := &MetricsEngine{ - test: runState, - logger: runState.Logger.WithField("component", "metrics-engine"), + registry: registry, + logger: logger.WithField("component", "metrics-engine"), ObservedMetrics: make(map[string]*metrics.Metric), } - if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) { - err := me.initSubMetricsAndThresholds() - if err != nil { - return nil, err - } - } - return me, nil } // CreateIngester returns a pseudo-Output that uses the given metric samples to // update the engine's inner state. -func (me *MetricsEngine) CreateIngester() output.Output { - me.outputIngester = &outputIngester{ +func (me *MetricsEngine) CreateIngester() *OutputIngester { + return &OutputIngester{ logger: me.logger.WithField("component", "metrics-engine-ingester"), metricsEngine: me, cardinality: newCardinalityControl(), } - return me.outputIngester } func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Metric, error) { // TODO: replace with strings.Cut after Go 1.18 nameParts := strings.SplitN(name, "{", 2) - metric := me.test.Registry.Get(nameParts[0]) + metric := me.registry.Get(nameParts[0]) if metric == nil { return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0]) } @@ -125,11 +114,14 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) { } } -func (me *MetricsEngine) initSubMetricsAndThresholds() error { - for metricName, thresholds := range me.test.Options.Thresholds { +// InitSubMetricsAndThresholds parses the thresholds from the test Options and +// initializes both the thresholds themselves, as well as any submetrics that +// were referenced in them. +func (me *MetricsEngine) InitSubMetricsAndThresholds(options lib.Options, onlyLogErrors bool) error { + for metricName, thresholds := range options.Thresholds { metric, err := me.getThresholdMetricOrSubmetric(metricName) - if me.test.RuntimeOptions.NoThresholds.Bool { + if onlyLogErrors { if err != nil { me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName) } @@ -154,7 +146,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // TODO: refactor out of here when https://github.com/grafana/k6/issues/1321 // lands and there is a better way to enable a metric with tag - if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) { + if options.SystemTags.Has(metrics.TagExpectedResponse) { _, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}") if err != nil { return err // shouldn't happen, but ¯\_(ツ)_/¯ @@ -167,10 +159,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { // StartThresholdCalculations spins up a new goroutine to crunch thresholds and // returns a callback that will stop the goroutine and finalizes calculations. func (me *MetricsEngine) StartThresholdCalculations( + ingester *OutputIngester, abortRun func(error), getCurrentTestRunDuration func() time.Duration, -) (finalize func() (breached []string), -) { +) (finalize func() (breached []string)) { if len(me.metricsWithThresholds) == 0 { return nil // no thresholds were defined } @@ -205,9 +197,9 @@ func (me *MetricsEngine) StartThresholdCalculations( }() return func() []string { - if me.outputIngester != nil { + if ingester != nil { // Stop the ingester so we don't get any more metrics - err := me.outputIngester.Stop() + err := ingester.Stop() if err != nil { me.logger.WithError(err).Warnf("There was a problem stopping the output ingester.") } diff --git a/metrics/engine/engine_test.go b/metrics/engine/engine_test.go index 06267cfa3c8..a941b4671b8 100644 --- a/metrics/engine/engine_test.go +++ b/metrics/engine/engine_test.go @@ -32,10 +32,12 @@ func TestNewMetricsEngineWithThresholds(t *testing.T) { _, err = trs.Registry.NewMetric("metric2", metrics.Counter) require.NoError(t, err) - me, err := NewMetricsEngine(trs) + me, err := NewMetricsEngine(trs.Registry, trs.Logger) require.NoError(t, err) require.NotNil(t, me) + require.NoError(t, me.InitSubMetricsAndThresholds(trs.Options, false)) + assert.Len(t, me.metricsWithThresholds, 2) } @@ -57,7 +59,7 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) { t.Parallel() me := newTestMetricsEngine(t) - _, err := me.test.Registry.NewMetric("metric1", metrics.Counter) + _, err := me.registry.NewMetric("metric1", metrics.Counter) require.NoError(t, err) _, err = me.getThresholdMetricOrSubmetric(tc.metricDefinition) @@ -69,16 +71,8 @@ func TestMetricsEngineGetThresholdMetricOrSubmetricError(t *testing.T) { func TestNewMetricsEngineNoThresholds(t *testing.T) { t.Parallel() - trs := &lib.TestRunState{ - TestPreInitState: &lib.TestPreInitState{ - Logger: testutils.NewLogger(t), - }, - } - - me, err := NewMetricsEngine(trs) - require.NoError(t, err) + me := newTestMetricsEngine(t) require.NotNil(t, me) - assert.Empty(t, me.metricsWithThresholds) } @@ -113,9 +107,9 @@ func TestMetricsEngineEvaluateThresholdNoAbort(t *testing.T) { t.Parallel() me := newTestMetricsEngine(t) - m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) + m1, err := me.registry.NewMetric("m1", metrics.Counter) require.NoError(t, err) - m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) + m2, err := me.registry.NewMetric("m2", metrics.Counter) require.NoError(t, err) ths := metrics.NewThresholds([]string{tc.threshold}) @@ -138,9 +132,9 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) { me := newTestMetricsEngine(t) - m1, err := me.test.Registry.NewMetric("m1", metrics.Counter) + m1, err := me.registry.NewMetric("m1", metrics.Counter) require.NoError(t, err) - m2, err := me.test.Registry.NewMetric("m2", metrics.Counter) + m2, err := me.registry.NewMetric("m2", metrics.Counter) require.NoError(t, err) ths := metrics.NewThresholds([]string{"count>5"}) @@ -159,18 +153,10 @@ func TestMetricsEngineEvaluateIgnoreEmptySink(t *testing.T) { assert.Empty(t, breached) } -func newTestMetricsEngine(t *testing.T) MetricsEngine { - trs := &lib.TestRunState{ - TestPreInitState: &lib.TestPreInitState{ - Logger: testutils.NewLogger(t), - Registry: metrics.NewRegistry(), - }, - } - - return MetricsEngine{ - logger: trs.Logger, - test: trs, - } +func newTestMetricsEngine(t *testing.T) *MetricsEngine { + m, err := NewMetricsEngine(metrics.NewRegistry(), testutils.NewLogger(t)) + require.NoError(t, err) + return m } func zeroTestRunDuration() time.Duration { diff --git a/metrics/engine/ingester.go b/metrics/engine/ingester.go index 62ffee8293e..496b6a6eaa7 100644 --- a/metrics/engine/ingester.go +++ b/metrics/engine/ingester.go @@ -13,16 +13,16 @@ const ( timeSeriesFirstLimit = 100_000 ) -var _ output.Output = &outputIngester{} +var _ output.Output = &OutputIngester{} // IngesterDescription is a short description for ingester. // This variable is used from a function in cmd/ui file for matching this output // and print a special text. const IngesterDescription = "Internal Metrics Ingester" -// outputIngester implements the output.Output interface and can be used to +// OutputIngester implements the output.Output interface and can be used to // "feed" the MetricsEngine data from a `k6 run` test run. -type outputIngester struct { +type OutputIngester struct { output.SampleBuffer logger logrus.FieldLogger @@ -32,12 +32,12 @@ type outputIngester struct { } // Description returns a human-readable description of the output. -func (oi *outputIngester) Description() string { +func (oi *OutputIngester) Description() string { return IngesterDescription } // Start the engine by initializing a new output.PeriodicFlusher -func (oi *outputIngester) Start() error { +func (oi *OutputIngester) Start() error { oi.logger.Debug("Starting...") pf, err := output.NewPeriodicFlusher(collectRate, oi.flushMetrics) @@ -51,7 +51,7 @@ func (oi *outputIngester) Start() error { } // Stop flushes any remaining metrics and stops the goroutine. -func (oi *outputIngester) Stop() error { +func (oi *OutputIngester) Stop() error { oi.logger.Debug("Stopping...") defer oi.logger.Debug("Stopped!") oi.periodicFlusher.Stop() @@ -59,7 +59,7 @@ func (oi *outputIngester) Stop() error { } // flushMetrics Writes samples to the MetricsEngine -func (oi *outputIngester) flushMetrics() { +func (oi *OutputIngester) flushMetrics() { sampleContainers := oi.GetBufferedSamples() if len(sampleContainers) == 0 { return diff --git a/metrics/engine/ingester_test.go b/metrics/engine/ingester_test.go index 71f7a7c8497..b299322e363 100644 --- a/metrics/engine/ingester_test.go +++ b/metrics/engine/ingester_test.go @@ -19,7 +19,7 @@ func TestIngesterOutputFlushMetrics(t *testing.T) { testMetric, err := piState.Registry.NewMetric("test_metric", metrics.Trend) require.NoError(t, err) - ingester := outputIngester{ + ingester := OutputIngester{ logger: piState.Logger, metricsEngine: &MetricsEngine{ ObservedMetrics: make(map[string]*metrics.Metric), @@ -55,9 +55,8 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { require.NoError(t, err) me := &MetricsEngine{ - test: &lib.TestRunState{ - TestPreInitState: piState, - }, + logger: piState.Logger, + registry: piState.Registry, ObservedMetrics: make(map[string]*metrics.Metric), } _, err = me.getThresholdMetricOrSubmetric("test_metric{a:1}") @@ -66,7 +65,7 @@ func TestIngesterOutputFlushSubmetrics(t *testing.T) { // assert that observed metrics is empty before to start require.Empty(t, me.ObservedMetrics) - ingester := outputIngester{ + ingester := OutputIngester{ logger: piState.Logger, metricsEngine: me, cardinality: newCardinalityControl(), @@ -107,7 +106,7 @@ func TestOutputFlushMetricsTimeSeriesWarning(t *testing.T) { require.NoError(t, err) logger, hook := testutils.NewLoggerWithHook(nil) - ingester := outputIngester{ + ingester := OutputIngester{ logger: logger, metricsEngine: &MetricsEngine{ ObservedMetrics: make(map[string]*metrics.Metric),