Skip to content

Commit

Permalink
wasm: Add metrics for wasm resolver evaluations
Browse files Browse the repository at this point in the history
This plumbs through metrics to the wasm evaluation, adding several
new timers. They will show up when using a Wasm bundle with any of the
usual evaluation mechanisms (eg opa eval, bench, server requests etc)

Signed-off-by: Patrick East <[email protected]>
  • Loading branch information
patrick-east authored and tsandall committed Nov 6, 2020
1 parent 55cdee0 commit 197c838
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 30 deletions.
4 changes: 2 additions & 2 deletions internal/wasm/sdk/examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func main() {
return
}

result, err := rego.Eval(ctx, entrypointID, &input)
result, err := rego.Eval(ctx, opa.EvalOpts{Entrypoint: entrypointID, Input: &input})
if err != nil {
fmt.Printf("error: %v\n", err)
return
Expand Down Expand Up @@ -105,7 +105,7 @@ func main() {
return
}

result, err = rego.Eval(ctx, entrypointID, &input)
result, err = rego.Eval(ctx, opa.EvalOpts{Entrypoint: entrypointID, Input: &input})
if err != nil {
fmt.Printf("error: %v\n", err)
return
Expand Down
2 changes: 1 addition & 1 deletion internal/wasm/sdk/examples/loaders/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func main() {
return
}

result, err := rego.Eval(ctx, entrypointID, &input)
result, err := rego.Eval(ctx, opa.EvalOpts{Entrypoint: entrypointID, Input: &input})
if err != nil {
fmt.Printf("error: %v\n", err)
return
Expand Down
28 changes: 23 additions & 5 deletions internal/wasm/sdk/opa/opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"fmt"
"runtime"
"sync"

"github.com/open-policy-agent/opa/metrics"
)

// OPA executes WebAssembly compiled Rego policies.
Expand Down Expand Up @@ -136,23 +138,35 @@ func (o *OPA) setPolicyData(policy []byte, data []byte) error {
return nil
}

// EvalOpts define options for performing an evaluation
type EvalOpts struct {
Entrypoint EntrypointID
Input *interface{}
Metrics metrics.Metrics
}

// Eval evaluates the policy with the given input, returning the
// evaluation results. If no policy was configured at construction
// time nor set after, the function returns ErrNotReady. It returns
// ErrInternal if any other error occurs.
func (o *OPA) Eval(ctx context.Context, entrypoint EntrypointID, input *interface{}) (*Result, error) {
func (o *OPA) Eval(ctx context.Context, opts EvalOpts) (*Result, error) {
if o.pool == nil {
return nil, ErrNotReady
}

instance, err := o.pool.Acquire(ctx)
m := opts.Metrics
if m == nil {
m = metrics.New()
}

instance, err := o.pool.Acquire(ctx, m)
if err != nil {
return nil, err
}

defer o.pool.Release(instance)

result, err := instance.Eval(ctx, entrypoint, input)
result, err := instance.Eval(ctx, opts.Entrypoint, opts.Input, m)
if err != nil {
return nil, fmt.Errorf("%v: %w", err, ErrInternal)
}
Expand All @@ -176,7 +190,7 @@ func (o *OPA) Close() {

// Entrypoints returns a mapping of entrypoint name to ID for use by Eval() and EvalBool().
func (o *OPA) Entrypoints(ctx context.Context) (map[string]EntrypointID, error) {
instance, err := o.pool.Acquire(ctx)
instance, err := o.pool.Acquire(ctx, metrics.New())
if err != nil {
return nil, err
}
Expand All @@ -190,8 +204,12 @@ func (o *OPA) Entrypoints(ctx context.Context) (map[string]EntrypointID, error)
// possible error values returned are as with Eval with addition of
// ErrUndefined indicating an undefined policy decision and
// ErrNonBoolean indicating a non-boolean policy decision.
// Deprecated: Use Eval instead.
func EvalBool(ctx context.Context, o *OPA, entrypoint EntrypointID, input *interface{}) (bool, error) {
rs, err := o.Eval(ctx, entrypoint, input)
rs, err := o.Eval(ctx, EvalOpts{
Entrypoint: entrypoint,
Input: input,
})
if err != nil {
return false, err
}
Expand Down
24 changes: 12 additions & 12 deletions internal/wasm/sdk/opa/opa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestOPA(t *testing.T) {
if len(data) == 0 {
data = nil
}
opa, err := opa.New().
instance, err := opa.New().
WithPolicyBytes(policy).
WithDataBytes(data).
WithMemoryLimits(131070, 0).
Expand All @@ -123,24 +123,24 @@ func TestOPA(t *testing.T) {
case eval.NewPolicy != "" && eval.NewData != "":
policy := compileRegoToWasm(eval.NewPolicy, test.Query)
data := parseJSON(eval.NewData)
if err := opa.SetPolicyData(policy, data); err != nil {
if err := instance.SetPolicyData(policy, data); err != nil {
t.Errorf(err.Error())
}

case eval.NewPolicy != "":
policy := compileRegoToWasm(eval.NewPolicy, test.Query)
if err := opa.SetPolicy(policy); err != nil {
if err := instance.SetPolicy(policy); err != nil {
t.Errorf(err.Error())
}

case eval.NewData != "":
data := parseJSON(eval.NewData)
if err := opa.SetData(*data); err != nil {
if err := instance.SetData(*data); err != nil {
t.Errorf(err.Error())
}
}

result, err := opa.Eval(context.Background(), 0, parseJSON(eval.Input))
result, err := instance.Eval(context.Background(), opa.EvalOpts{Input: parseJSON(eval.Input)})
if err != nil {
t.Errorf(err.Error())
}
Expand All @@ -150,7 +150,7 @@ func TestOPA(t *testing.T) {
}
}

opa.Close()
instance.Close()
})
}
}
Expand Down Expand Up @@ -183,13 +183,13 @@ func TestNamedEntrypoint(t *testing.T) {
t.Fatalf("Unexpected error: %s", err)
}

opa, _ := opa.New().
instance, _ := opa.New().
WithPolicyBytes(compiler.Bundle().WasmModules[0].Raw).
WithMemoryLimits(131070, 2*131070). // TODO: For some reason unlimited memory slows down the eval_ctx_new().
WithPoolSize(1).
Init()

eps, err := opa.Entrypoints(ctx)
eps, err := instance.Entrypoints(ctx)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
Expand All @@ -198,7 +198,7 @@ func TestNamedEntrypoint(t *testing.T) {
t.Fatalf("Expected 2 entrypoints, got: %+v", eps)
}

a, err := opa.Eval(ctx, eps["test/a"], nil)
a, err := instance.Eval(ctx, opa.EvalOpts{Entrypoint: eps["test/a"]})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
Expand All @@ -208,7 +208,7 @@ func TestNamedEntrypoint(t *testing.T) {
t.Fatalf("Expected result for 'test/a' to be %s, got: %s", exp, string(util.MustMarshalJSON(a.Result)))
}

b, err := opa.Eval(ctx, eps["test/b"], nil)
b, err := instance.Eval(ctx, opa.EvalOpts{Entrypoint: eps["test/b"]})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
Expand All @@ -220,7 +220,7 @@ func TestNamedEntrypoint(t *testing.T) {

func BenchmarkWasmRego(b *testing.B) {
policy := compileRegoToWasm("a = true", "data.p.a = x")
opa, _ := opa.New().
instance, _ := opa.New().
WithPolicyBytes(policy).
WithMemoryLimits(131070, 2*131070). // TODO: For some reason unlimited memory slows down the eval_ctx_new().
WithPoolSize(1).
Expand All @@ -233,7 +233,7 @@ func BenchmarkWasmRego(b *testing.B) {
var input interface{} = make(map[string]interface{})

for i := 0; i < b.N; i++ {
if _, err := opa.Eval(ctx, 0, &input); err != nil {
if _, err := instance.Eval(ctx, opa.EvalOpts{Input: &input}); err != nil {
panic(err)
}
}
Expand Down
7 changes: 6 additions & 1 deletion internal/wasm/sdk/opa/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"context"
"fmt"
"sync"

"github.com/open-policy-agent/opa/metrics"
)

// pool maintains a pool of WebAssemly VM instances.
Expand Down Expand Up @@ -46,7 +48,10 @@ func newPool(poolSize, memoryMinPages, memoryMaxPages uint32) *pool {
// Acquire obtains a VM from the pool, waiting if all VMms are in use
// and building one as necessary. Returns either ErrNotReady or
// ErrInternal if an error.
func (p *pool) Acquire(ctx context.Context) (*vm, error) {
func (p *pool) Acquire(ctx context.Context, metrics metrics.Metrics) (*vm, error) {
metrics.Timer("opa_wasm_pool_acquire_vm").Start()
defer metrics.Timer("opa_wasm_pool_acquire_vm").Stop()

select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
18 changes: 15 additions & 3 deletions internal/wasm/sdk/opa/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,14 @@ func newVM(policy []byte, data []byte, memoryMin, memoryMax uint32) (*vm, error)
return v, nil
}

func (i *vm) Eval(ctx context.Context, entrypoint EntrypointID, input *interface{}) (interface{}, error) {
if err := i.setHeapState(i.evalHeapPtr); err != nil {
func (i *vm) Eval(ctx context.Context, entrypoint EntrypointID, input *interface{}, metrics metrics.Metrics) (interface{}, error) {
metrics.Timer("opa_wasm_vm_eval_total").Start()
defer metrics.Timer("opa_wasm_vm_eval_total").Stop()

metrics.Timer("opa_wasm_vm_eval_set_heap_state").Start()
err := i.setHeapState(i.evalHeapPtr)
metrics.Timer("opa_wasm_vm_eval_set_heap_state").Stop()
if err != nil {
return nil, err
}

Expand All @@ -180,7 +186,7 @@ func (i *vm) Eval(ctx context.Context, entrypoint EntrypointID, input *interface
}()

// Parse the input JSON and activate it with the data.

metrics.Timer("opa_wasm_vm_eval_prepare_input").Start()
addr, err := i.evalCtxNew()
if err != nil {
return nil, err
Expand Down Expand Up @@ -209,8 +215,10 @@ func (i *vm) Eval(ctx context.Context, entrypoint EntrypointID, input *interface
return nil, err
}
}
metrics.Timer("opa_wasm_vm_eval_prepare_input").Stop()

// Evaluate the policy.
metrics.Timer("opa_wasm_vm_eval").Start()
func() {
defer func() {
if e := recover(); e != nil {
Expand All @@ -228,16 +236,20 @@ func (i *vm) Eval(ctx context.Context, entrypoint EntrypointID, input *interface
_, err = i.eval(ctxAddr)
}()

metrics.Timer("opa_wasm_vm_eval").Stop()

if err != nil {
return nil, err
}

metrics.Timer("opa_wasm_vm_eval_extract_result").Start()
resultAddr, err := i.evalCtxGetResult(ctxAddr)
if err != nil {
return nil, err
}

result, err := i.fromRegoJSON(resultAddr.ToI32(), false)
metrics.Timer("opa_wasm_vm_eval_extract_result").Stop()

// Skip free'ing input and result JSON as the heap will be reset next round anyway.

Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
RegoInputParse = "rego_input_parse"
RegoLoadFiles = "rego_load_files"
RegoLoadBundles = "rego_load_bundles"
RegoExternalResolve = "rego_external_resolve"
)

// Info contains attributes describing the underlying metrics provider.
Expand Down
6 changes: 4 additions & 2 deletions resolver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"

"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/metrics"
)

// Resolver defines an external value resolver for OPA evaluations.
Expand All @@ -17,8 +18,9 @@ type Resolver interface {

// Input as provided to a Resolver instance when evaluating.
type Input struct {
Ref ast.Ref
Input *ast.Term
Ref ast.Ref
Input *ast.Term
Metrics metrics.Metrics
}

// Result of resolving a ref.
Expand Down
9 changes: 8 additions & 1 deletion resolver/wasm/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func (r *Resolver) Close() {
// Eval performs an evaluation using the provided input and the Wasm module
// associated with this Resolver instance.
func (r *Resolver) Eval(ctx context.Context, input resolver.Input) (resolver.Result, error) {
input.Metrics.Timer("wasm_resolver_eval").Start()
defer input.Metrics.Timer("wasm_resolver_eval").Stop()

var inp *interface{}

Expand All @@ -106,7 +108,12 @@ func (r *Resolver) Eval(ctx context.Context, input resolver.Input) (resolver.Res
return resolver.Result{}, fmt.Errorf("internal error: invalid entrypoint id %s", numValue)
}

out, err := r.o.Eval(ctx, opa.EntrypointID(epID), inp)
opts := opa.EvalOpts{
Input: inp,
Entrypoint: opa.EntrypointID(epID),
Metrics: input.Metrics,
}
out, err := r.o.Eval(ctx, opts)
if err != nil {
return resolver.Result{}, err
}
Expand Down
3 changes: 3 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1825,6 +1825,7 @@ func TestDataMetricsEval(t *testing.T) {
"timer_rego_query_compile_ns",
"timer_rego_query_eval_ns",
"timer_server_handler_ns",
"timer_rego_external_resolve_ns",
})

// Repeat previous request, expect to have hit the query cache
Expand All @@ -1834,6 +1835,7 @@ func TestDataMetricsEval(t *testing.T) {
"timer_rego_input_parse_ns",
"timer_rego_query_eval_ns",
"timer_server_handler_ns",
"timer_rego_external_resolve_ns",
})

// Make a request to evaluate `data` and use partial evaluation,
Expand All @@ -1848,6 +1850,7 @@ func TestDataMetricsEval(t *testing.T) {
"timer_rego_query_eval_ns",
"timer_rego_partial_eval_ns",
"timer_server_handler_ns",
"timer_rego_external_resolve_ns",
})

// Repeat previous partial eval request, this time it should
Expand Down
12 changes: 9 additions & 3 deletions topdown/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package topdown

import (
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/metrics"
"github.com/open-policy-agent/opa/resolver"
)

Expand All @@ -32,10 +33,15 @@ func (t *resolverTrie) Put(ref ast.Ref, r resolver.Resolver) {
}

func (t *resolverTrie) Resolve(e *eval, ref ast.Ref) (ast.Value, error) {
e.metrics.Timer(metrics.RegoExternalResolve).Start()
defer e.metrics.Timer(metrics.RegoExternalResolve).Stop()

in := resolver.Input{
Ref: ref,
Input: e.input,
Ref: ref,
Input: e.input,
Metrics: e.metrics,
}

node := t
for i, t := range ref {
child, ok := node.children[t.Value]
Expand Down Expand Up @@ -72,7 +78,7 @@ func (t *resolverTrie) mktree(e *eval, in resolver.Input) (ast.Value, error) {
}
obj := ast.NewObject()
for k, child := range t.children {
v, err := child.mktree(e, resolver.Input{Ref: append(in.Ref, ast.NewTerm(k)), Input: in.Input})
v, err := child.mktree(e, resolver.Input{Ref: append(in.Ref, ast.NewTerm(k)), Input: in.Input, Metrics: in.Metrics})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 197c838

Please sign in to comment.