Skip to content

Commit

Permalink
Add sampling capabilities to the tracing module (grafana#2886)
Browse files Browse the repository at this point in the history
* Improve assertHasTraceIDMetadata robustness

* Add Sampler interface and ProbabilisticSampler implementation

 This commit adds a Sampler interface defining a contract for
 implementing support for sampling. This means that any client of a
 sampler could go through the interface to decide whether or not to
 perform an action or keep some piece of data.

 This commit adds a ProbabilisticSampler implementation of Sampler,
 meant to be implemented by propagators to decide whether they should
 set their sampling flag to true or false.

 This commit also adds a chance function which returns true with a
 `percentage` of chance, to support the ProbabilisticSampler implem.

* Make Propagators support sampling by implementing the Sampler interface

This commit defines a SamplerPropagator interface, which established
the contract for a propagator which bases its sampling flag on a
Sampler implementation.

It also makes the concrete implementations of Propagators implement
the Sampler interface, and ensure the flags field of the generated
trace context headers is set based on the Sampler's result.

* Implement support for the sampling option

* Add sampling support in the tracing Client

* Add tracing module sampling example

* Address PR feedback from @codebien
  • Loading branch information
oleiade authored Mar 10, 2023
1 parent f893e83 commit 314d9d5
Show file tree
Hide file tree
Showing 9 changed files with 482 additions and 31 deletions.
143 changes: 137 additions & 6 deletions cmd/tests/tracing_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"go.k6.io/k6/cmd"
"go.k6.io/k6/lib/testutils/httpmultibin"
"go.k6.io/k6/metrics"
)

func TestTracingModuleClient(t *testing.T) {
Expand Down Expand Up @@ -58,7 +59,7 @@ func TestTracingModuleClient(t *testing.T) {
jsonResults, err := afero.ReadFile(ts.FS, "results.json")
require.NoError(t, err)

assertHasTraceIDMetadata(t, jsonResults)
assertHasTraceIDMetadata(t, jsonResults, 9, tb.Replacer.Replace("HTTPBIN_IP_URL/tracing"))
}

func TestTracingClient_DoesNotInterfereWithHTTPModule(t *testing.T) {
Expand Down Expand Up @@ -100,6 +101,107 @@ func TestTracingClient_DoesNotInterfereWithHTTPModule(t *testing.T) {
assert.Equal(t, int64(2), atomic.LoadInt64(&gotInstrumentedRequests))
}

func TestTracingModuleClient_HundredPercentSampling(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)

var gotRequests int64
var gotSampleFlags int64

tb.Mux.HandleFunc("/tracing", func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&gotRequests, 1)

traceparent := r.Header.Get("traceparent")
require.NotEmpty(t, traceparent)
require.Len(t, traceparent, 55)

if traceparent[54] == '1' {
atomic.AddInt64(&gotSampleFlags, 1)
}
})

script := tb.Replacer.Replace(`
import http from "k6/http";
import { check } from "k6";
import tracing from "k6/experimental/tracing";
export const options = {
// 100 iterations to make sure we get 100% sampling
iterations: 100,
}
const instrumentedHTTP = new tracing.Client({
propagator: "w3c",
// 100% sampling
sampling: 1.0,
})
export default function () {
instrumentedHTTP.get("HTTPBIN_IP_URL/tracing");
};
`)

ts := getSingleFileTestState(t, script, []string{"--out", "json=results.json"}, 0)
cmd.ExecuteWithGlobalState(ts.GlobalState)

assert.Equal(t, int64(100), atomic.LoadInt64(&gotSampleFlags))
assert.Equal(t, int64(100), atomic.LoadInt64(&gotRequests))

jsonResults, err := afero.ReadFile(ts.FS, "results.json")
require.NoError(t, err)

assertHasTraceIDMetadata(t, jsonResults, 100, tb.Replacer.Replace("HTTPBIN_IP_URL/tracing"))
}

func TestTracingModuleClient_ZeroPercentSampling(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)

var gotRequests int64
var gotSampleFlags int64

tb.Mux.HandleFunc("/tracing", func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt64(&gotRequests, 1)

traceparent := r.Header.Get("traceparent")
require.NotEmpty(t, traceparent)
require.Len(t, traceparent, 55)

if traceparent[54] == '1' {
atomic.AddInt64(&gotSampleFlags, 1)
}
})

script := tb.Replacer.Replace(`
import http from "k6/http";
import { check } from "k6";
import tracing from "k6/experimental/tracing";
export const options = {
// 100 iterations to make sure we get 100% sampling
iterations: 100,
}
const instrumentedHTTP = new tracing.Client({
propagator: "w3c",
// 0% sampling
sampling: 0.0,
})
export default function () {
instrumentedHTTP.get("HTTPBIN_IP_URL/tracing");
};
`)

ts := getSingleFileTestState(t, script, []string{}, 0)
cmd.ExecuteWithGlobalState(ts.GlobalState)

assert.Equal(t, int64(0), atomic.LoadInt64(&gotSampleFlags))
assert.Equal(t, int64(100), atomic.LoadInt64(&gotRequests))
}

func TestTracingInstrumentHTTP_W3C(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
Expand Down Expand Up @@ -141,7 +243,7 @@ func TestTracingInstrumentHTTP_W3C(t *testing.T) {
jsonResults, err := afero.ReadFile(ts.FS, "results.json")
require.NoError(t, err)

assertHasTraceIDMetadata(t, jsonResults)
assertHasTraceIDMetadata(t, jsonResults, 9, tb.Replacer.Replace("HTTPBIN_IP_URL/tracing"))
}

func TestTracingInstrumentHTTP_Jaeger(t *testing.T) {
Expand Down Expand Up @@ -185,7 +287,7 @@ func TestTracingInstrumentHTTP_Jaeger(t *testing.T) {
jsonResults, err := afero.ReadFile(ts.FS, "results.json")
require.NoError(t, err)

assertHasTraceIDMetadata(t, jsonResults)
assertHasTraceIDMetadata(t, jsonResults, 8, tb.Replacer.Replace("HTTPBIN_IP_URL/tracing"))
}

func TestTracingInstrumentHTTP_FillsParams(t *testing.T) {
Expand Down Expand Up @@ -236,7 +338,7 @@ func TestTracingInstrumentHTTP_FillsParams(t *testing.T) {
jsonResults, err := afero.ReadFile(ts.FS, "results.json")
require.NoError(t, err)

assertHasTraceIDMetadata(t, jsonResults)
assertHasTraceIDMetadata(t, jsonResults, 8, tb.Replacer.Replace("HTTPBIN_IP_URL/tracing"))
}

func TestTracingInstrummentHTTP_SupportsMultipleTestScripts(t *testing.T) {
Expand Down Expand Up @@ -288,14 +390,23 @@ func TestTracingInstrummentHTTP_SupportsMultipleTestScripts(t *testing.T) {
require.NoError(t, err)

assert.Equal(t, int64(1), atomic.LoadInt64(&gotRequests))
assertHasTraceIDMetadata(t, jsonResults)
assertHasTraceIDMetadata(t, jsonResults, 1, tb.Replacer.Replace("HTTPBIN_IP_URL/tracing"))
}

// assertHasTraceIDMetadata checks that the trace_id metadata is present and has the correct format
// for all http metrics in the json results file.
func assertHasTraceIDMetadata(t *testing.T, jsonResults []byte) {
//
// The `expectOccurences` parameter is used to check that the trace_id metadata is present for the
// expected number of http metrics. For instance, in a script with 2 http requests, the
// `expectOccurences` parameter should be 2.
//
// The onUrls parameter is used to check that the trace_id metadata is present for the data points
// with the expected URLs. Its value should reflect the URLs used in the script.
func assertHasTraceIDMetadata(t *testing.T, jsonResults []byte, expectOccurences int, onUrls ...string) {
gotHTTPDataPoints := false

urlHTTPTraceIDs := make(map[string]map[string]int)

for _, jsonLine := range bytes.Split(jsonResults, []byte("\n")) {
if len(jsonLine) == 0 {
continue
Expand All @@ -322,9 +433,28 @@ func assertHasTraceIDMetadata(t *testing.T, jsonResults []byte) {
require.True(t, gotTraceID)

assert.Len(t, traceID, 32)

if _, ok := urlHTTPTraceIDs[line.Data.Tags["url"]]; !ok {
urlHTTPTraceIDs[line.Data.Tags["url"]] = make(map[string]int)
urlHTTPTraceIDs[line.Data.Tags["url"]][metrics.HTTPReqsName] = 0
}
urlHTTPTraceIDs[line.Data.Tags["url"]][line.Metric]++
}

assert.True(t, gotHTTPDataPoints)

for _, url := range onUrls {
assert.Contains(t, urlHTTPTraceIDs, url)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqsName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqFailedName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqDurationName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqBlockedName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqConnectingName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqTLSHandshakingName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqSendingName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqWaitingName], expectOccurences)
assert.Equal(t, urlHTTPTraceIDs[url][metrics.HTTPReqReceivingName], expectOccurences)
}
}

// sampleEnvelope is a trimmed version of the struct found
Expand All @@ -335,6 +465,7 @@ type sampleEnvelope struct {
Type string `json:"type"`
Data struct {
Value float64 `json:"value"`
Tags map[string]string `json:"tags"`
Metadata map[string]interface{} `json:"metadata"`
} `json:"data"`
}
16 changes: 11 additions & 5 deletions js/modules/k6/experimental/tracing/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ type Client struct {
asyncRequestFunc HTTPAsyncRequestFunc
}

// HTTPRequestFunc is a type alias representing the prototype of
// k6's http module's request function
type (
HTTPRequestFunc func(method string, url goja.Value, args ...goja.Value) (*httpmodule.Response, error)
// HTTPRequestFunc is a type alias representing the prototype of k6's http module's request function
HTTPRequestFunc func(method string, url goja.Value, args ...goja.Value) (*httpmodule.Response, error)

// HTTPAsyncRequestFunc is a type alias representing the prototype of k6's http module's asyncRequest function
HTTPAsyncRequestFunc func(method string, url goja.Value, args ...goja.Value) (*goja.Promise, error)
)

Expand Down Expand Up @@ -85,11 +86,16 @@ func (c *Client) Configure(opts options) error {
return fmt.Errorf("invalid options: %w", err)
}

var sampler Sampler = NewAlwaysOnSampler()
if opts.Sampling != 1.0 {
sampler = NewProbabilisticSampler(opts.Sampling)
}

switch opts.Propagator {
case "w3c":
c.propagator = &W3CPropagator{}
c.propagator = NewW3CPropagator(sampler)
case "jaeger":
c.propagator = &JaegerPropagator{}
c.propagator = NewJaegerPropagator(sampler)
default:
return fmt.Errorf("unknown propagator: %s", opts.Propagator)
}
Expand Down
19 changes: 19 additions & 0 deletions js/modules/k6/experimental/tracing/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,22 @@ func randHexString(n int) string {

return string(b)
}

// chance returns true with a `percentage` chance, otherwise false.
// the `percentage` argument is expected to be
// within 0 <= percentage <= 100 range.
//
// The chance function works under the assumption that the
// go rand module has been seeded with a non-deterministic
// value.
func chance(r *rand.Rand, percentage float64) bool {
if percentage == 0.0 {
return false
}

if percentage == 1.0 {
return true
}

return r.Float64() < percentage
}
15 changes: 13 additions & 2 deletions js/modules/k6/experimental/tracing/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package tracing
import (
"errors"
"fmt"
"math/rand"
"time"

"github.com/dop251/goja"
"go.k6.io/k6/js/common"
Expand All @@ -19,6 +21,9 @@ type (
ModuleInstance struct {
vu modules.VU

// random is a random number generator used by the module.
random *rand.Rand

// Client holds the module's default tracing client.
*Client
}
Expand All @@ -40,6 +45,12 @@ func New() *RootModule {
func (*RootModule) NewModuleInstance(vu modules.VU) modules.Instance {
return &ModuleInstance{
vu: vu,

// Seed the random number generator with the current time.
// This ensures that any call to rand.Intn() will return
// less-deterministic results.
//nolint:gosec // we don't need cryptographic randomness here
random: rand.New(rand.NewSource(time.Now().UTC().UnixNano())),
}
}

Expand All @@ -66,8 +77,8 @@ func (mi *ModuleInstance) newClient(cc goja.ConstructorCall) *goja.Object {
common.Throw(rt, errors.New("Client constructor expects a single configuration object as argument; none given"))
}

var opts options
if err := rt.ExportTo(cc.Arguments[0], &opts); err != nil {
opts, err := newOptions(rt, cc.Arguments[0])
if err != nil {
common.Throw(rt, fmt.Errorf("unable to parse options object; reason: %w", err))
}

Expand Down
39 changes: 32 additions & 7 deletions js/modules/k6/experimental/tracing/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,45 @@ package tracing
import (
"errors"
"fmt"

"github.com/dop251/goja"
)

// options are the options that can be passed to the
// tracing.instrumentHTTP() method.
type options struct {
// Propagation is the propagation format to use for the tracer.
Propagator string `js:"propagator"`
Propagator string `json:"propagator"`

// Sampling is the sampling rate to use for the tracer.
Sampling *float64 `js:"sampling"`
// Sampling is the sampling rate to use for the
// tracer, expressed in percents within the
// bounds: 0.0 <= n <= 1.0.
Sampling float64 `json:"sampling"`

// Baggage is a map of baggage items to add to the tracer.
Baggage map[string]string `js:"baggage"`
Baggage map[string]string `json:"baggage"`
}

// defaultSamplingRate is the default sampling rate applied to options.
const defaultSamplingRate float64 = 1.0

// newOptions returns a new options object from the given goja.Value.
//
// Note that if the sampling field value is absent, or nullish, we'll
// set it to the `defaultSamplingRate` value.
func newOptions(rt *goja.Runtime, from goja.Value) (options, error) {
var opts options

if err := rt.ExportTo(from, &opts); err != nil {
return opts, fmt.Errorf("unable to parse options object; reason: %w", err)
}

fromSamplingValue := from.ToObject(rt).Get("sampling")
if fromSamplingValue == nil || isNullish(fromSamplingValue) {
opts.Sampling = defaultSamplingRate
}

return opts, nil
}

func (i *options) validate() error {
Expand All @@ -27,9 +53,8 @@ func (i *options) validate() error {
return fmt.Errorf("unknown propagator: %s", i.Propagator)
}

// TODO: implement sampling support
if i.Sampling != nil {
return errors.New("sampling is not yet supported")
if i.Sampling < 0.0 || i.Sampling > 1.0 {
return errors.New("sampling rate must be between 0.0 and 1.0")
}

// TODO: implement baggage support
Expand Down
Loading

0 comments on commit 314d9d5

Please sign in to comment.