From a96a8abf4bb5122545ddec5b8412b791eaa5fcf3 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Tue, 31 Aug 2021 15:19:08 +0300 Subject: [PATCH] Copy xk6-execution code back in (#2126) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit works on #1320 Co-authored-by: Ivan Mirić --- core/local/k6execution_test.go | 315 +++++++++++++++++++++++++++ js/initcontext.go | 2 + js/modules/k6/execution/execution.go | 210 ++++++++++++++++++ js/runner_test.go | 99 +++++++++ 4 files changed, 626 insertions(+) create mode 100644 core/local/k6execution_test.go create mode 100644 js/modules/k6/execution/execution.go diff --git a/core/local/k6execution_test.go b/core/local/k6execution_test.go new file mode 100644 index 00000000000..ef51cacff3a --- /dev/null +++ b/core/local/k6execution_test.go @@ -0,0 +1,315 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package local + +import ( + "encoding/json" + "io/ioutil" + "net/url" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/loader" +) + +func TestExecutionInfoVUSharing(t *testing.T) { + t.Parallel() + script := []byte(` + import exec from 'k6/execution'; + import { sleep } from 'k6'; + + // The cvus scenario should reuse the two VUs created for the carr scenario. + export let options = { + scenarios: { + carr: { + executor: 'constant-arrival-rate', + exec: 'carr', + rate: 9, + timeUnit: '0.95s', + duration: '1s', + preAllocatedVUs: 2, + maxVUs: 10, + gracefulStop: '100ms', + }, + cvus: { + executor: 'constant-vus', + exec: 'cvus', + vus: 2, + duration: '1s', + startTime: '2s', + gracefulStop: '0s', + }, + }, + }; + + export function cvus() { + const info = Object.assign({scenario: 'cvus'}, exec.vu); + console.log(JSON.stringify(info)); + sleep(0.2); + }; + + export function carr() { + const info = Object.assign({scenario: 'carr'}, exec.vu); + console.log(JSON.stringify(info)); + }; +`) + + logger := logrus.New() + logger.SetOutput(ioutil.Discard) + logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}} + logger.AddHook(&logHook) + + runner, err := js.New( + logger, + &loader.SourceData{ + URL: &url.URL{Path: "/script.js"}, + Data: script, + }, + nil, + lib.RuntimeOptions{}, + ) + require.NoError(t, err) + + ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + defer cancel() + + type vuStat struct { + iteration uint64 + scIter map[string]uint64 + } + vuStats := map[uint64]*vuStat{} + + type logEntry struct { + IDInInstance uint64 + Scenario string + IterationInInstance uint64 + IterationInScenario uint64 + } + + errCh := make(chan error, 1) + go func() { errCh <- execScheduler.Run(ctx, ctx, samples) }() + + select { + case err := <-errCh: + require.NoError(t, err) + entries := logHook.Drain() + assert.InDelta(t, 20, len(entries), 2) + le := &logEntry{} + for _, entry := range entries { + err = json.Unmarshal([]byte(entry.Message), le) + require.NoError(t, err) + assert.Contains(t, []uint64{1, 2}, le.IDInInstance) + if _, ok := vuStats[le.IDInInstance]; !ok { + vuStats[le.IDInInstance] = &vuStat{0, make(map[string]uint64)} + } + if le.IterationInInstance > vuStats[le.IDInInstance].iteration { + vuStats[le.IDInInstance].iteration = le.IterationInInstance + } + if le.IterationInScenario > vuStats[le.IDInInstance].scIter[le.Scenario] { + vuStats[le.IDInInstance].scIter[le.Scenario] = le.IterationInScenario + } + } + require.Len(t, vuStats, 2) + // Both VUs should complete 10 iterations each globally, but 5 + // iterations each per scenario (iterations are 0-based) + for _, v := range vuStats { + assert.Equal(t, uint64(9), v.iteration) + assert.Equal(t, uint64(4), v.scIter["cvus"]) + assert.Equal(t, uint64(4), v.scIter["carr"]) + } + case <-time.After(10 * time.Second): + t.Fatal("timed out") + } +} + +func TestExecutionInfoScenarioIter(t *testing.T) { + t.Parallel() + script := []byte(` + import exec from 'k6/execution'; + + // The pvu scenario should reuse the two VUs created for the carr scenario. + export let options = { + scenarios: { + carr: { + executor: 'constant-arrival-rate', + exec: 'carr', + rate: 9, + timeUnit: '0.95s', + duration: '1s', + preAllocatedVUs: 2, + maxVUs: 10, + gracefulStop: '100ms', + }, + pvu: { + executor: 'per-vu-iterations', + exec: 'pvu', + vus: 2, + iterations: 5, + startTime: '2s', + gracefulStop: '100ms', + }, + }, + }; + + export function pvu() { + const info = Object.assign({VUID: __VU}, exec.scenario); + console.log(JSON.stringify(info)); + } + + export function carr() { + const info = Object.assign({VUID: __VU}, exec.scenario); + console.log(JSON.stringify(info)); + }; +`) + + logger := logrus.New() + logger.SetOutput(ioutil.Discard) + logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}} + logger.AddHook(&logHook) + + runner, err := js.New( + logger, + &loader.SourceData{ + URL: &url.URL{Path: "/script.js"}, + Data: script, + }, + nil, + lib.RuntimeOptions{}, + ) + require.NoError(t, err) + + ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + defer cancel() + + errCh := make(chan error, 1) + go func() { errCh <- execScheduler.Run(ctx, ctx, samples) }() + + scStats := map[string]uint64{} + + type logEntry struct { + Name string + IterationInInstance, VUID uint64 + } + + select { + case err := <-errCh: + require.NoError(t, err) + entries := logHook.Drain() + require.Len(t, entries, 20) + le := &logEntry{} + for _, entry := range entries { + err = json.Unmarshal([]byte(entry.Message), le) + require.NoError(t, err) + assert.Contains(t, []uint64{1, 2}, le.VUID) + if le.IterationInInstance > scStats[le.Name] { + scStats[le.Name] = le.IterationInInstance + } + } + require.Len(t, scStats, 2) + // The global per scenario iteration count should be 9 (iterations + // start at 0), despite VUs being shared or more than 1 being used. + for _, v := range scStats { + assert.Equal(t, uint64(9), v) + } + case <-time.After(10 * time.Second): + t.Fatal("timed out") + } +} + +// Ensure that scenario iterations returned from k6/execution are +// stable during the execution of an iteration. +func TestSharedIterationsStable(t *testing.T) { + t.Parallel() + script := []byte(` + import { sleep } from 'k6'; + import exec from 'k6/execution'; + + export let options = { + scenarios: { + test: { + executor: 'shared-iterations', + vus: 50, + iterations: 50, + }, + }, + }; + export default function () { + sleep(1); + console.log(JSON.stringify(Object.assign({VUID: __VU}, exec.scenario))); + } +`) + + logger := logrus.New() + logger.SetOutput(ioutil.Discard) + logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}} + logger.AddHook(&logHook) + + runner, err := js.New( + logger, + &loader.SourceData{ + URL: &url.URL{Path: "/script.js"}, + Data: script, + }, + nil, + lib.RuntimeOptions{}, + ) + require.NoError(t, err) + + ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, lib.Options{}) + defer cancel() + + errCh := make(chan error, 1) + go func() { errCh <- execScheduler.Run(ctx, ctx, samples) }() + + expIters := [50]int64{} + for i := 0; i < 50; i++ { + expIters[i] = int64(i) + } + gotLocalIters, gotGlobalIters := []int64{}, []int64{} + + type logEntry struct{ IterationInInstance, IterationInTest int64 } + + select { + case err := <-errCh: + require.NoError(t, err) + entries := logHook.Drain() + require.Len(t, entries, 50) + le := &logEntry{} + for _, entry := range entries { + err = json.Unmarshal([]byte(entry.Message), le) + require.NoError(t, err) + require.Equal(t, le.IterationInInstance, le.IterationInTest) + gotLocalIters = append(gotLocalIters, le.IterationInInstance) + gotGlobalIters = append(gotGlobalIters, le.IterationInTest) + } + + assert.ElementsMatch(t, expIters, gotLocalIters) + assert.ElementsMatch(t, expIters, gotGlobalIters) + case <-time.After(5 * time.Second): + t.Fatal("timed out") + } +} diff --git a/js/initcontext.go b/js/initcontext.go index e70db8b52bb..4580ce9df5d 100644 --- a/js/initcontext.go +++ b/js/initcontext.go @@ -41,6 +41,7 @@ import ( "go.k6.io/k6/js/modules/k6/crypto/x509" "go.k6.io/k6/js/modules/k6/data" "go.k6.io/k6/js/modules/k6/encoding" + "go.k6.io/k6/js/modules/k6/execution" "go.k6.io/k6/js/modules/k6/grpc" "go.k6.io/k6/js/modules/k6/html" "go.k6.io/k6/js/modules/k6/http" @@ -322,6 +323,7 @@ func getInternalJSModules() map[string]interface{} { "k6/crypto/x509": x509.New(), "k6/data": data.New(), "k6/encoding": encoding.New(), + "k6/execution": execution.New(), "k6/net/grpc": grpc.New(), "k6/html": html.New(), "k6/http": http.New(), diff --git a/js/modules/k6/execution/execution.go b/js/modules/k6/execution/execution.go new file mode 100644 index 00000000000..56e944499ca --- /dev/null +++ b/js/modules/k6/execution/execution.go @@ -0,0 +1,210 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package execution + +import ( + "errors" + "time" + + "github.com/dop251/goja" + + "go.k6.io/k6/js/common" + "go.k6.io/k6/js/modules" + "go.k6.io/k6/lib" +) + +type ( + // RootModule is the global module instance that will create module + // instances for each VU. + RootModule struct{} + + // ModuleInstance represents an instance of the execution module. + ModuleInstance struct { + modules.InstanceCore + obj *goja.Object + } +) + +var ( + _ modules.IsModuleV2 = &RootModule{} + _ modules.Instance = &ModuleInstance{} +) + +// New returns a pointer to a new RootModule instance. +func New() *RootModule { + return &RootModule{} +} + +// NewModuleInstance implements the modules.IsModuleV2 interface to return +// a new instance for each VU. +func (*RootModule) NewModuleInstance(m modules.InstanceCore) modules.Instance { + mi := &ModuleInstance{InstanceCore: m} + rt := m.GetRuntime() + o := rt.NewObject() + defProp := func(name string, newInfo func() (*goja.Object, error)) { + err := o.DefineAccessorProperty(name, rt.ToValue(func() goja.Value { + obj, err := newInfo() + if err != nil { + common.Throw(rt, err) + } + return obj + }), nil, goja.FLAG_FALSE, goja.FLAG_TRUE) + if err != nil { + common.Throw(rt, err) + } + } + defProp("scenario", mi.newScenarioInfo) + defProp("instance", mi.newInstanceInfo) + defProp("vu", mi.newVUInfo) + + mi.obj = o + + return mi +} + +// GetExports returns the exports of the execution module. +func (mi *ModuleInstance) GetExports() modules.Exports { + return modules.Exports{Default: mi.obj} +} + +// newScenarioInfo returns a goja.Object with property accessors to retrieve +// information about the scenario the current VU is running in. +func (mi *ModuleInstance) newScenarioInfo() (*goja.Object, error) { + ctx := mi.GetContext() + rt := common.GetRuntime(ctx) + vuState := mi.GetState() + if vuState == nil { + return nil, errors.New("getting scenario information in the init context is not supported") + } + if rt == nil { + return nil, errors.New("goja runtime is nil in context") + } + getScenarioState := func() *lib.ScenarioState { + ss := lib.GetScenarioState(mi.GetContext()) + if ss == nil { + common.Throw(rt, errors.New("getting scenario information in the init context is not supported")) + } + return ss + } + + si := map[string]func() interface{}{ + "name": func() interface{} { + return getScenarioState().Name + }, + "executor": func() interface{} { + return getScenarioState().Executor + }, + "startTime": func() interface{} { + //nolint:lll + // Return the timestamp in milliseconds, since that's how JS + // timestamps usually are: + // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/Date#time_value_or_timestamp_number + // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/now#return_value + return getScenarioState().StartTime.UnixNano() / int64(time.Millisecond) + }, + "progress": func() interface{} { + p, _ := getScenarioState().ProgressFn() + return p + }, + "iterationInInstance": func() interface{} { + return vuState.GetScenarioLocalVUIter() + }, + "iterationInTest": func() interface{} { + return vuState.GetScenarioGlobalVUIter() + }, + } + + return newInfoObj(rt, si) +} + +// newInstanceInfo returns a goja.Object with property accessors to retrieve +// information about the local instance stats. +func (mi *ModuleInstance) newInstanceInfo() (*goja.Object, error) { + ctx := mi.GetContext() + es := lib.GetExecutionState(ctx) + if es == nil { + return nil, errors.New("getting instance information in the init context is not supported") + } + + rt := common.GetRuntime(ctx) + if rt == nil { + return nil, errors.New("goja runtime is nil in context") + } + + ti := map[string]func() interface{}{ + "currentTestRunDuration": func() interface{} { + return float64(es.GetCurrentTestRunDuration()) / float64(time.Millisecond) + }, + "iterationsCompleted": func() interface{} { + return es.GetFullIterationCount() + }, + "iterationsInterrupted": func() interface{} { + return es.GetPartialIterationCount() + }, + "vusActive": func() interface{} { + return es.GetCurrentlyActiveVUsCount() + }, + "vusInitialized": func() interface{} { + return es.GetInitializedVUsCount() + }, + } + + return newInfoObj(rt, ti) +} + +// newVUInfo returns a goja.Object with property accessors to retrieve +// information about the currently executing VU. +func (mi *ModuleInstance) newVUInfo() (*goja.Object, error) { + ctx := mi.GetContext() + vuState := lib.GetState(ctx) + if vuState == nil { + return nil, errors.New("getting VU information in the init context is not supported") + } + + rt := common.GetRuntime(ctx) + if rt == nil { + return nil, errors.New("goja runtime is nil in context") + } + + vi := map[string]func() interface{}{ + "idInInstance": func() interface{} { return vuState.VUID }, + "idInTest": func() interface{} { return vuState.VUIDGlobal }, + "iterationInInstance": func() interface{} { return vuState.Iteration }, + "iterationInScenario": func() interface{} { + return vuState.GetScenarioVUIter() + }, + } + + return newInfoObj(rt, vi) +} + +func newInfoObj(rt *goja.Runtime, props map[string]func() interface{}) (*goja.Object, error) { + o := rt.NewObject() + + for p, get := range props { + err := o.DefineAccessorProperty(p, rt.ToValue(get), nil, goja.FLAG_FALSE, goja.FLAG_TRUE) + if err != nil { + return nil, err + } + } + + return o, nil +} diff --git a/js/runner_test.go b/js/runner_test.go index 473ee1810a0..c29048a21a4 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -2021,3 +2021,102 @@ func TestMinIterationDurationIsCancellable(t *testing.T) { require.NoError(t, err) } } + +func TestExecutionInfo(t *testing.T) { + t.Parallel() + + testCases := []struct { + name, script, expErr string + }{ + {name: "vu_ok", script: ` + var exec = require('k6/execution'); + + exports.default = function() { + if (exec.vu.idInInstance !== 1) throw new Error('unexpected VU ID: '+exec.vu.idInInstance); + if (exec.vu.idInTest !== 10) throw new Error('unexpected global VU ID: '+exec.vu.idInTest); + if (exec.vu.iterationInInstance !== 0) throw new Error('unexpected VU iteration: '+exec.vu.iterationInInstance); + if (exec.vu.iterationInScenario !== 0) throw new Error('unexpected scenario iteration: '+exec.vu.iterationInScenario); + }`}, + {name: "vu_err", script: ` + var exec = require('k6/execution'); + exec.vu; + `, expErr: "getting VU information in the init context is not supported"}, + {name: "scenario_ok", script: ` + var exec = require('k6/execution'); + var sleep = require('k6').sleep; + + exports.default = function() { + var si = exec.scenario; + sleep(0.1); + if (si.name !== 'default') throw new Error('unexpected scenario name: '+si.name); + if (si.executor !== 'test-exec') throw new Error('unexpected executor: '+si.executor); + if (si.startTime > new Date().getTime()) throw new Error('unexpected startTime: '+si.startTime); + if (si.progress !== 0.1) throw new Error('unexpected progress: '+si.progress); + if (si.iterationInInstance !== 3) throw new Error('unexpected scenario local iteration: '+si.iterationInInstance); + if (si.iterationInTest !== 4) throw new Error('unexpected scenario local iteration: '+si.iterationInTest); + }`}, + {name: "scenario_err", script: ` + var exec = require('k6/execution'); + exec.scenario; + `, expErr: "getting scenario information in the init context is not supported"}, + {name: "test_ok", script: ` + var exec = require('k6/execution'); + + exports.default = function() { + var ti = exec.instance; + if (ti.currentTestRunDuration !== 0) throw new Error('unexpected test duration: '+ti.currentTestRunDuration); + if (ti.vusActive !== 1) throw new Error('unexpected vusActive: '+ti.vusActive); + if (ti.vusInitialized !== 0) throw new Error('unexpected vusInitialized: '+ti.vusInitialized); + if (ti.iterationsCompleted !== 0) throw new Error('unexpected iterationsCompleted: '+ti.iterationsCompleted); + if (ti.iterationsInterrupted !== 0) throw new Error('unexpected iterationsInterrupted: '+ti.iterationsInterrupted); + }`}, + {name: "test_err", script: ` + var exec = require('k6/execution'); + exec.instance; + `, expErr: "getting instance information in the init context is not supported"}, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + r, err := getSimpleRunner(t, "/script.js", tc.script) + if tc.expErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.expErr) + return + } + require.NoError(t, err) + + samples := make(chan stats.SampleContainer, 100) + initVU, err := r.NewVU(1, 10, samples) + require.NoError(t, err) + + execScheduler, err := local.NewExecutionScheduler(r, testutils.NewLogger(t)) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctx = lib.WithExecutionState(ctx, execScheduler.GetState()) + ctx = lib.WithScenarioState(ctx, &lib.ScenarioState{ + Name: "default", + Executor: "test-exec", + StartTime: time.Now(), + ProgressFn: func() (float64, []string) { + return 0.1, nil + }, + }) + vu := initVU.Activate(&lib.VUActivationParams{ + RunContext: ctx, + Exec: "default", + GetNextIterationCounters: func() (uint64, uint64) { return 3, 4 }, + }) + + execState := execScheduler.GetState() + execState.ModCurrentlyActiveVUsCount(+1) + err = vu.RunOnce() + assert.NoError(t, err) + }) + } +}