Skip to content

Commit

Permalink
Bloblang changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Feb 24, 2022
1 parent 5e2c9f4 commit cec72e7
Show file tree
Hide file tree
Showing 33 changed files with 253 additions and 360 deletions.
2 changes: 1 addition & 1 deletion config/examples/track_benthos_downloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ processor_resources:
- bloblang: |
root = this.map_each(release -> release.assets.map_each(asset -> {
"source": "github",
"dist": asset.name.re_replace("^benthos-?((lambda_)|_)[0-9\\.]+(-rc[0-9]+)?_([^\\.]+).*", "$2$4"),
"dist": asset.name.re_replace_all("^benthos-?((lambda_)|_)[0-9\\.]+(-rc[0-9]+)?_([^\\.]+).*", "$2$4"),
"download_count": asset.download_count,
"version": release.tag_name.trim("v"),
}).filter(asset -> asset.dist != "checksums")).flatten()
Expand Down
4 changes: 2 additions & 2 deletions config/test/bloblang/env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ tests:
input_batch:
- content: '{}'
output_batches:
- - content_equals: '{"bar_env":"","foo_env":"fooval"}'
- - content_equals: '{"bar_env":null,"foo_env":"fooval"}'

- name: neither exists
target_processors: /pipeline/processors
environment: {}
input_batch:
- content: '{}'
output_batches:
- - content_equals: '{"bar_env":"","foo_env":""}'
- - content_equals: '{"bar_env":null,"foo_env":null}'

2 changes: 1 addition & 1 deletion config/test/bloblang/github_releases.blobl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
root = this.map_each(release -> release.assets.map_each(asset -> {
"source": "github",
"dist": asset.name.re_replace("^benthos-?((lambda_)|_)[0-9\\.]+(-rc[0-9]+)?_([^\\.]+).*", "$2$4"),
"dist": asset.name.re_replace_all("^benthos-?((lambda_)|_)[0-9\\.]+(-rc[0-9]+)?_([^\\.]+).*", "$2$4"),
"download_count": asset.download_count,
"version": release.tag_name.trim("v"),
}).filter(asset -> asset.dist != "checksums")).flatten()
2 changes: 1 addition & 1 deletion internal/bloblang/field/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestExpressions(t *testing.T) {
}()),
),
numDyn: 1,
output: ``,
output: `null`,
messages: []easyMsg{
{content: `hello world`, meta: map[string]string{
"bar": "from bar",
Expand Down
4 changes: 2 additions & 2 deletions internal/bloblang/field/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (q QueryResolver) ResolveString(index int, msg Message, escaped, legacy boo
Index: index,
MsgBatch: msg,
Legacy: legacy,
NewMsg: msg.Get(index),
NewMeta: msg.Get(index),
}.WithValueFunc(func() *interface{} {
if jObj, err := msg.Get(index).JSON(); err == nil {
return &jObj
Expand All @@ -71,7 +71,7 @@ func (q QueryResolver) ResolveBytes(index int, msg Message, escaped, legacy bool
Index: index,
MsgBatch: msg,
Legacy: legacy,
NewMsg: msg.Get(index),
NewMeta: msg.Get(index),
}.WithValueFunc(func() *interface{} {
if jObj, err := msg.Get(index).JSON(); err == nil {
return &jObj
Expand Down
25 changes: 15 additions & 10 deletions internal/bloblang/mapping/assignment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ import (
"strings"

"github.com/Jeffail/benthos/v3/internal/bloblang/query"
"github.com/Jeffail/benthos/v3/lib/message"
"github.com/Jeffail/gabs/v2"
)

//------------------------------------------------------------------------------

type metaMsg interface {
MetaSet(key, value string)
MetaDelete(key string)
MetaIter(f func(k, v string) error) error
}

// AssignmentContext contains references to all potential assignment
// destinations of a given mapping.
type AssignmentContext struct {
Vars map[string]interface{}
Msg *message.Part
Meta metaMsg
Value *interface{}
}

Expand Down Expand Up @@ -76,24 +81,24 @@ func NewMetaAssignment(key *string) *MetaAssignment {

// Apply a value to a metadata key.
func (m *MetaAssignment) Apply(value interface{}, ctx AssignmentContext) error {
if ctx.Msg == nil {
if ctx.Meta == nil {
return errors.New("unable to assign metadata in the current context")
}
_, deleted := value.(query.Delete)
if m.key == nil {
if deleted {
_ = ctx.Msg.MetaIter(func(k, _ string) error {
ctx.Msg.MetaDelete(k)
_ = ctx.Meta.MetaIter(func(k, _ string) error {
ctx.Meta.MetaDelete(k)
return nil
})
} else {
if m, ok := value.(map[string]interface{}); ok {
_ = ctx.Msg.MetaIter(func(k, _ string) error {
ctx.Msg.MetaDelete(k)
_ = ctx.Meta.MetaIter(func(k, _ string) error {
ctx.Meta.MetaDelete(k)
return nil
})
for k, v := range m {
ctx.Msg.MetaSet(k, query.IToString(v))
ctx.Meta.MetaSet(k, query.IToString(v))
}
} else {
return fmt.Errorf("setting root meta object requires object value, received: %T", value)
Expand All @@ -102,9 +107,9 @@ func (m *MetaAssignment) Apply(value interface{}, ctx AssignmentContext) error {
return nil
}
if deleted {
ctx.Msg.MetaDelete(*m.key)
ctx.Meta.MetaDelete(*m.key)
} else {
ctx.Msg.MetaSet(*m.key, query.IToString(value))
ctx.Meta.MetaSet(*m.key, query.IToString(value))
}
return nil
}
Expand Down
7 changes: 5 additions & 2 deletions internal/bloblang/mapping/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ func (e *Executor) mapPart(appendTo *message.Part, index int, reference Message)
Vars: vars,
Index: index,
MsgBatch: reference,
NewMsg: newPart,
NewMeta: newPart,
NewValue: &newValue,
}.WithValueFunc(lazyValue))
if err != nil {
var line int
Expand All @@ -190,7 +191,7 @@ func (e *Executor) mapPart(appendTo *message.Part, index int, reference Message)
}
if err = stmt.assignment.Apply(res, AssignmentContext{
Vars: vars,
Msg: newPart,
Meta: newPart,
Value: &newValue,
}); err != nil {
var line int
Expand Down Expand Up @@ -256,6 +257,8 @@ func (e *Executor) Exec(ctx query.FunctionContext) (interface{}, error) {
}

var newObj interface{} = query.Nothing(nil)
ctx.NewValue = &newObj

for _, stmt := range e.statements {
res, err := stmt.query.Exec(ctx)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/bloblang/mapping/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func TestAssignments(t *testing.T) {
mapping: NewExecutor("", nil, nil,
NewStatement(nil, NewJSONAssignment("foo"), initFunc("meta", "foo")),
),
input: []part{{Content: `{}`}},
err: errors.New("failed assignment (line 0): metadata value 'foo' not found"),
input: []part{{Content: `{}`}},
output: &part{Content: `{"foo":null}`},
},
"meta assignment": {
mapping: NewExecutor("", nil, nil,
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestQueries(t *testing.T) {
NewStatement(nil, NewJSONAssignment("foo"), initFunc("meta", "foo")),
),
input: []part{{Content: `{}`}},
err: errors.New("failed assignment (line 0): metadata value 'foo' not found"),
err: errors.New("expected bool value, got object from mapping"),
},
}

Expand Down
61 changes: 60 additions & 1 deletion internal/bloblang/package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,64 @@ func TestMappings(t *testing.T) {
mapping.NewTargetPath(mapping.TargetValue, "baz", "buz", "0", "bev"),
},
},
"root copies to root": {
mapping: `
root = this
root.first = root
root.second = root
`,
input: map[string]interface{}{
"foo": "bar",
},
output: map[string]interface{}{
"foo": "bar",
"first": map[string]interface{}{
"foo": "bar",
},
"second": map[string]interface{}{
"foo": "bar",
"first": map[string]interface{}{
"foo": "bar",
},
},
},
assignmentTargets: []mapping.TargetPath{
mapping.NewTargetPath(mapping.TargetValue),
mapping.NewTargetPath(mapping.TargetValue, "first"),
mapping.NewTargetPath(mapping.TargetValue, "second"),
},
queryTargets: []query.TargetPath{
query.NewTargetPath(query.TargetValue),
query.NewTargetPath(query.TargetRoot),
query.NewTargetPath(query.TargetRoot),
},
},
"root edit from map": {
mapping: `
map foo {
root.from_map = "hello world"
root = root.from_map
}
root = this
root.meow = this.apply("foo")
`,
input: map[string]interface{}{
"foo": "bar",
},
output: map[string]interface{}{
"foo": "bar",
"meow": "hello world",
},
assignmentTargets: []mapping.TargetPath{
mapping.NewTargetPath(mapping.TargetValue),
mapping.NewTargetPath(mapping.TargetValue, "meow"),
},
queryTargets: []query.TargetPath{
query.NewTargetPath(query.TargetValue),
query.NewTargetPath(query.TargetValue),
query.NewTargetPath(query.TargetRoot, "from_map"),
},
},
}

for name, test := range tests {
Expand All @@ -91,12 +149,13 @@ func TestMappings(t *testing.T) {
assert.Equal(t, test.assignmentTargets, m.AssignmentTargets())

_, targets := m.QueryTargets(query.TargetsContext{
Maps: map[string]query.Function{},
Maps: m.Maps(),
})
assert.Equal(t, test.queryTargets, targets)

res, err := m.Exec(query.FunctionContext{
MsgBatch: message.QuickBatch(nil),
Maps: m.Maps(),
Vars: map[string]interface{}{},
}.WithValue(test.input))
require.NoError(t, err)
Expand Down
4 changes: 1 addition & 3 deletions internal/bloblang/parser/query_function_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ func variableLiteralParser() Func {
}
}

var errNoRoot = errors.New("unable to reference the `root` of your mapped document within a query. This feature will be introduced soon, but in the meantime in order to use a mapped value multiple times use variables (https://www.benthos.dev/docs/guides/bloblang/about#variables). If instead you wish to refer to a field `root` from your input document use `this.root`")

func fieldLiteralRootParser(pCtx Context) Func {
fieldPathParser := Expect(
JoinStringPayloads(
Expand Down Expand Up @@ -252,7 +250,7 @@ func fieldLiteralRootParser(pCtx Context) Func {
if path == "this" {
fn = query.NewFieldFunction("")
} else if path == "root" {
return Fail(NewFatalError(input, errNoRoot), input)
fn = query.NewRootFieldFunction("")
} else {
if pCtx.HasNamedContext(path) {
fn = query.NewNamedContextFieldFunction(path, "")
Expand Down
40 changes: 3 additions & 37 deletions internal/bloblang/parser/query_function_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package parser
import (
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -224,7 +223,7 @@ bar""")`,
},
"metadata 2": {
input: `meta("bar")`,
output: "",
output: "null",
messages: []easyMsg{
{
meta: map[string]string{
Expand Down Expand Up @@ -278,7 +277,7 @@ bar""")`,
},
"metadata 6": {
input: `meta("foo")`,
output: ``,
output: `null`,
index: 1,
messages: []easyMsg{
{
Expand Down Expand Up @@ -329,7 +328,7 @@ bar""")`,
},
"error function 2": {
input: `error().from(1)`,
output: ``,
output: `null`,
messages: []easyMsg{
{meta: map[string]string{
imessage.FailFlagKey: "test error",
Expand Down Expand Up @@ -807,37 +806,4 @@ func TestTimestamps(t *testing.T) {
if tThen.Sub(now).Seconds() > 5.0 {
t.Errorf("Timestamps too far out of sync: %v and %v", tThen, now)
}

now = time.Now()
e, perr = tryParseQuery("timestamp()", false)
require.Nil(t, perr)

tStamp = query.ExecToString(e, query.FunctionContext{MsgBatch: message.QuickBatch(nil)})

tThen, err = time.Parse("Mon Jan 2 15:04:05 -0700 MST 2006", tStamp)
if err != nil {
t.Fatal(err)
}

if tThen.Sub(now).Seconds() > 5.0 {
t.Errorf("Timestamps too far out of sync: %v and %v", tThen, now)
}

now = time.Now()
e, perr = tryParseQuery("timestamp_utc()", false)
require.Nil(t, perr)

tStamp = query.ExecToString(e, query.FunctionContext{MsgBatch: message.QuickBatch(nil)})

tThen, err = time.Parse("Mon Jan 2 15:04:05 -0700 MST 2006", tStamp)
if err != nil {
t.Fatal(err)
}

if tThen.Sub(now).Seconds() > 5.0 {
t.Errorf("Timestamps too far out of sync: %v and %v", tThen, now)
}
if !strings.Contains(tStamp, "UTC") {
t.Errorf("Non-UTC timezone detected: %v", tStamp)
}
}
2 changes: 1 addition & 1 deletion internal/bloblang/parser/query_method_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestMethodParser(t *testing.T) {
},
"meta from all": {
input: `meta("foo").from_all()`,
output: `["bar","","baz"]`,
output: `["bar",null,"baz"]`,
messages: []easyMsg{
{meta: map[string]string{"foo": "bar"}},
{},
Expand Down
Loading

0 comments on commit cec72e7

Please sign in to comment.