diff --git a/.golangci.yml b/.golangci.yml index 45bc553188..e8739dadde 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -35,23 +35,22 @@ linters-settings: linters: disable-all: true enable: - # Default linters reported by `golangci-lint help linters` in v1.41.1 - # Disabled for Go 1.18 - - gosimple - - staticcheck - - unused + # Default linters reported by `golangci-lint help linters` in v1.52.0 - errcheck + - gosimple - govet - ineffassign + - staticcheck - typecheck + - unused # Extra linters: - # Disabled for Go 1.18 - # - wastedassign + - wastedassign - stylecheck - gofmt - goimports - - gocritic - - revive + # gocritic is very slow (golangci-lint v1.52.0) + # - gocritic + # - revive - unconvert - durationcheck - depguard diff --git a/CHANGELOG.md b/CHANGELOG.md index 99eaf6ed8f..1de43b0a33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,15 +5,24 @@ All notable changes to this project will be documented in this file. ## Unreleased +### Fixed + +- The `find_all` bloblang method no longer produces results that are of an `unknown` type. + +## 4.13.0 - 2023-03-15 + ### Added +- Fix vulnerability [GO-2023-1571](https://pkg.go.dev/vuln/GO-2023-1571) - New `nats_kv` processor, input and output. +- Field `partition` added to the `kafka_franz` output, allowing for manual partitioning. ### Fixed - The `broker` output with the pattern `fan_out_sequential` will no longer abandon in-flight requests that are error blocked until the full shutdown timeout has occurred. - The `broker` input no longer reports itself as unavailable when a child input has intentionally closed. -- The `find_all` bloblang method no longer produces results that are of an `unknown` type. +- Config unit tests that check for structured data should no longer fail in all cases. +- The `http_server` input with a custom address now supports path variables. ## 4.12.1 - 2023-02-23 diff --git a/config/test/structured_metadata.yaml b/config/test/structured_metadata.yaml new file mode 100644 index 0000000000..0ca0647909 --- /dev/null +++ b/config/test/structured_metadata.yaml @@ -0,0 +1,22 @@ +input: + stdin: + codec: lines +pipeline: + processors: + - mapping: | + meta foo = { "a": "hello" } + meta bar = { "b": { "c": "hello" } } + meta baz = [ { "a": "hello" }, { "b": { "c": "hello" } } ] +output: + stdout: + codec: lines + +tests: + - name: Should not fail + input_batch: + - content: hello + output_batches: + - - metadata_equals: + foo: { "a": "hello" } + bar: { "b": { "c": "hello" } } + baz: [ { "a": "hello" }, { "b": { "c": "hello" } } ] diff --git a/go.mod b/go.mod index e9f3b8aaa9..fc8de2e699 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/itchyny/timefmt-go v0.1.5 github.com/jhump/protoreflect v1.14.1 github.com/jmespath/go-jmespath v0.4.0 - github.com/klauspost/compress v1.15.15 + github.com/klauspost/compress v1.16.3 github.com/lib/pq v1.10.4 github.com/linkedin/goavro/v2 v2.12.0 github.com/matoous/go-nanoid/v2 v2.0.0 @@ -91,7 +91,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tetratelabs/wazero v1.0.0-pre.9 github.com/tilinna/z85 v1.0.0 - github.com/twmb/franz-go v1.12.1 + github.com/twmb/franz-go v1.13.0 github.com/twmb/franz-go/pkg/kmsg v1.4.0 github.com/urfave/cli/v2 v2.11.0 github.com/vmihailenco/msgpack/v5 v5.3.5 @@ -110,11 +110,11 @@ require ( go.opentelemetry.io/otel/sdk v1.13.0 go.opentelemetry.io/otel/trace v1.13.0 go.uber.org/multierr v1.9.0 - golang.org/x/crypto v0.6.0 - golang.org/x/net v0.6.0 + golang.org/x/crypto v0.7.0 + golang.org/x/net v0.8.0 golang.org/x/oauth2 v0.5.0 golang.org/x/sync v0.1.0 - golang.org/x/text v0.7.0 + golang.org/x/text v0.8.0 google.golang.org/api v0.103.0 google.golang.org/grpc v1.53.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 @@ -272,8 +272,8 @@ require ( go.uber.org/atomic v1.10.0 // indirect golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect golang.org/x/mod v0.8.0 // indirect - golang.org/x/sys v0.5.0 // indirect - golang.org/x/term v0.5.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/term v0.6.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gonum.org/v1/gonum v0.11.0 // indirect diff --git a/go.sum b/go.sum index 14cea328b4..1e97b6d427 100644 --- a/go.sum +++ b/go.sum @@ -702,9 +702,8 @@ github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYs github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= -github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= @@ -872,7 +871,6 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= @@ -1043,8 +1041,8 @@ github.com/trivago/grok v1.0.0/go.mod h1:9t59xLInhrncYq9a3J7488NgiBZi5y5yC7bss+w github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM= github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/twmb/franz-go v1.12.1 h1:8lWT8q0spL40Nfw6eonJ8OoPGLvF9arvadRRmcSiu9Y= -github.com/twmb/franz-go v1.12.1/go.mod h1:Ofc5tSSUJKLmpRNUYSejUsAZKYAHDHywTS322KWdChQ= +github.com/twmb/franz-go v1.13.0 h1:J4VyTXVlOhiCDCXS56ut2ZRAylaimPXnIqtCq9Wlfbw= +github.com/twmb/franz-go v1.13.0/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= github.com/twmb/franz-go/pkg/kmsg v1.4.0 h1:tbp9hxU6m8qZhQTlpGiaIJOm4BXix5lsuEZ7K00dF0s= github.com/twmb/franz-go/pkg/kmsg v1.4.0/go.mod h1:SxG/xJKhgPu25SamAq0rrucfp7lbzCpEXOC+vH/ELrY= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= @@ -1173,9 +1171,8 @@ golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220817201139-bc19a97f63c8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= -golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= +golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1286,8 +1283,8 @@ golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= -golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1407,13 +1404,13 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1423,8 +1420,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/checkpoint/capped.go b/internal/checkpoint/capped.go index 464c9f511e..bde527220f 100644 --- a/internal/checkpoint/capped.go +++ b/internal/checkpoint/capped.go @@ -3,8 +3,6 @@ package checkpoint import ( "context" "sync" - - "github.com/benthosdev/benthos/v4/internal/component" ) // Capped receives an ordered feed of integer based offsets being tracked, and @@ -51,18 +49,14 @@ func (c *Capped[T]) Track(ctx context.Context, payload T, batchSize int64) (func defer cancel() go func() { <-ctx.Done() - c.cond.L.Lock() c.cond.Broadcast() - c.cond.L.Unlock() }() pending := c.t.Pending() for pending > 0 && pending+batchSize > c.cap { c.cond.Wait() - select { - case <-ctx.Done(): - return nil, component.ErrTimeout - default: + if err := ctx.Err(); err != nil { + return nil, err } pending = c.t.Pending() } diff --git a/internal/checkpoint/uncapped.go b/internal/checkpoint/uncapped.go index d33b48c017..79ceb019fd 100644 --- a/internal/checkpoint/uncapped.go +++ b/internal/checkpoint/uncapped.go @@ -7,8 +7,8 @@ package checkpoint // Also keeps track of the logical size of the unresolved sequence, which allows // for limiting the number of pending checkpoints. type Uncapped[T any] struct { - positionOffset int64 - checkpoint *T + checkpointPosition int64 + checkpoint *T start, end *node[T] } @@ -35,6 +35,8 @@ func (t *Uncapped[T]) Track(payload T, batchSize int64) func() *T { newNode.prev = t.end newNode.position += t.end.position t.end.next = newNode + } else { + newNode.position += t.checkpointPosition } t.end = newNode @@ -47,7 +49,7 @@ func (t *Uncapped[T]) Track(payload T, batchSize int64) func() *T { } else { tmp := newNode.payload t.checkpoint = &tmp - t.positionOffset = newNode.position + t.checkpointPosition = newNode.position t.start = newNode.next } @@ -55,9 +57,6 @@ func (t *Uncapped[T]) Track(payload T, batchSize int64) func() *T { newNode.next.prev = newNode.prev } else { t.end = newNode.prev - if t.end == nil { - t.positionOffset = 0 - } } return t.checkpoint } @@ -68,7 +67,7 @@ func (t *Uncapped[T]) Pending() int64 { if t.end == nil { return 0 } - return t.end.position - t.positionOffset + return t.end.position - t.checkpointPosition } // Highest returns the payload of the highest resolved checkpoint. diff --git a/internal/cli/run.go b/internal/cli/run.go index cd84d2410d..87ecbf849f 100644 --- a/internal/cli/run.go +++ b/internal/cli/run.go @@ -126,10 +126,10 @@ func Run() { Value: false, Usage: "display version info, then exit", }, - &cli.StringFlag{ + &cli.StringSliceFlag{ Name: "env-file", Aliases: []string{"e"}, - Value: "", + Value: cli.NewStringSlice(), Usage: "import environment variables from a dotenv file", }, &cli.StringFlag{ @@ -186,7 +186,7 @@ Either run Benthos as a stream processor or choose a command: benthos -r "./production/*.yaml" -c ./config.yaml`[1:], Flags: flags, Before: func(c *cli.Context) error { - if dotEnvFile := c.String("env-file"); dotEnvFile != "" { + for _, dotEnvFile := range c.StringSlice("env-file") { dotEnvBytes, err := ifs.ReadFile(ifs.OS(), dotEnvFile) if err != nil { fmt.Printf("Failed to read dotenv file: %v\n", err) diff --git a/internal/cli/test/condition.go b/internal/cli/test/condition.go index 24523dc964..2eff41ba96 100644 --- a/internal/cli/test/condition.go +++ b/internal/cli/test/condition.go @@ -88,11 +88,11 @@ func (c *ConditionsMap) UnmarshalYAML(value *yaml.Node) error { } cond = val case "metadata_equals": - val := MetadataEqualsCondition{} - if err := v.Decode(&val); err != nil { + root := map[string]any{} + if err := v.Decode(&root); err != nil { return fmt.Errorf("line %v: %v", v.Line, err) } - cond = val + cond = MetadataEqualsCondition(root) default: return fmt.Errorf("line %v: message part condition type not recognised: %v", v.Line, k) } diff --git a/internal/docs/field.go b/internal/docs/field.go index 8517f88d5d..6c4bd7ce2a 100644 --- a/internal/docs/field.go +++ b/internal/docs/field.go @@ -434,8 +434,9 @@ func (f FieldSpec) getLintFunc() LintFunc { } if f.Interpolated { if fn != nil { + innerFn := fn fn = func(ctx LintContext, line, col int, value any) []Lint { - lints := f.customLintFn(ctx, line, col, value) + lints := innerFn(ctx, line, col, value) moreLints := LintBloblangField(ctx, line, col, value) return append(lints, moreLints...) } @@ -445,8 +446,9 @@ func (f FieldSpec) getLintFunc() LintFunc { } if f.Bloblang { if fn != nil { + innerFn := fn fn = func(ctx LintContext, line, col int, value any) []Lint { - lints := f.customLintFn(ctx, line, col, value) + lints := innerFn(ctx, line, col, value) moreLints := LintBloblangMapping(ctx, line, col, value) return append(lints, moreLints...) } diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 19951540ef..3adcb15665 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -225,8 +225,8 @@ func (h *Client) waitForAccess(ctx context.Context) bool { } // ResponseToBatch attempts to parse an HTTP response into a 2D slice of bytes. -func (h *Client) ResponseToBatch(res *http.Response) (resMsg message.Batch, err error) { - resMsg = message.QuickBatch(nil) +func (h *Client) ResponseToBatch(res *http.Response) (message.Batch, error) { + resMsg := message.QuickBatch(nil) annotatePart := func(p *message.Part) { p.MetaSetMut("http_status_code", res.StatusCode) @@ -244,17 +244,17 @@ func (h *Client) ResponseToBatch(res *http.Response) (resMsg message.Batch, err nextPart := message.NewPart(nil) annotatePart(nextPart) resMsg = append(resMsg, nextPart) - return + return resMsg, nil } defer res.Body.Close() var mediaType string var params map[string]string + var err error if contentType := res.Header.Get("Content-Type"); len(contentType) > 0 { if mediaType, params, err = mime.ParseMediaType(contentType); err != nil { h.log.Warnf("Failed to parse media type from Content-Type header: %v\n", err) } - err = nil } var buffer bytes.Buffer @@ -262,7 +262,7 @@ func (h *Client) ResponseToBatch(res *http.Response) (resMsg message.Batch, err var bytesRead int64 if bytesRead, err = buffer.ReadFrom(res.Body); err != nil { h.log.Errorf("Failed to read response: %v\n", err) - return + return resMsg, err } nextPart := message.NewPart(nil) @@ -272,7 +272,7 @@ func (h *Client) ResponseToBatch(res *http.Response) (resMsg message.Batch, err annotatePart(nextPart) resMsg = append(resMsg, nextPart) - return + return resMsg, nil } mr := multipart.NewReader(res.Body, params["boundary"]) @@ -281,16 +281,15 @@ func (h *Client) ResponseToBatch(res *http.Response) (resMsg message.Batch, err var p *multipart.Part if p, err = mr.NextPart(); err != nil { if err == io.EOF { - err = nil break } - return + return resMsg, err } var bytesRead int64 if bytesRead, err = buffer.ReadFrom(p); err != nil { h.log.Errorf("Failed to read response: %v\n", err) - return + return resMsg, err } nextPart := message.NewPart(buffer.Bytes()[bufferIndex : bufferIndex+bytesRead]) @@ -299,7 +298,8 @@ func (h *Client) ResponseToBatch(res *http.Response) (resMsg message.Batch, err annotatePart(nextPart) resMsg = append(resMsg, nextPart) } - return + + return resMsg, nil } type retryStrategy int diff --git a/internal/impl/aws/input_kinesis_record_batcher.go b/internal/impl/aws/input_kinesis_record_batcher.go index 93ab73bc55..469377ccec 100644 --- a/internal/impl/aws/input_kinesis_record_batcher.go +++ b/internal/impl/aws/input_kinesis_record_batcher.go @@ -79,7 +79,8 @@ func (a *awsKinesisRecordBatcher) FlushMessage(ctx context.Context) (asyncMessag resolveFn, err := a.checkpointer.Track(ctx, a.batchedSequence, int64(a.flushedMessage.Len())) if err != nil { - if errors.Is(err, component.ErrTimeout) { + if ctx.Err() != nil || errors.Is(err, component.ErrTimeout) { + // No need to log this error, just continue with no message. err = nil } return asyncMessage{}, err diff --git a/internal/impl/io/input_http_server.go b/internal/impl/io/input_http_server.go index 7e5a91e0e5..a3a091980d 100644 --- a/internal/impl/io/input_http_server.go +++ b/internal/impl/io/input_http_server.go @@ -138,7 +138,7 @@ type httpServerInput struct { log log.Modular mgr bundle.NewManagement - mux *http.ServeMux + mux *mux.Router server *http.Server timeout time.Duration @@ -159,14 +159,14 @@ type httpServerInput struct { } func newHTTPServerInput(conf input.Config, mgr bundle.NewManagement) (input.Streamed, error) { - var mux *http.ServeMux + var gMux *mux.Router var server *http.Server var err error if len(conf.HTTPServer.Address) > 0 { - mux = http.NewServeMux() + gMux = mux.NewRouter() server = &http.Server{Addr: conf.HTTPServer.Address} - if server.Handler, err = conf.HTTPServer.CORS.WrapHandler(mux); err != nil { + if server.Handler, err = conf.HTTPServer.CORS.WrapHandler(gMux); err != nil { return nil, fmt.Errorf("bad CORS configuration: %w", err) } } @@ -192,7 +192,7 @@ func newHTTPServerInput(conf input.Config, mgr bundle.NewManagement) (input.Stre conf: conf.HTTPServer, log: mgr.Logger(), mgr: mgr, - mux: mux, + mux: gMux, server: server, timeout: timeout, responseHeaders: map[string]*field.Expression{}, @@ -220,12 +220,12 @@ func newHTTPServerInput(conf input.Config, mgr bundle.NewManagement) (input.Stre postHdlr := gzipHandler(h.postHandler) wsHdlr := gzipHandler(h.wsHandler) - if mux != nil { + if gMux != nil { if len(h.conf.Path) > 0 { - mux.HandleFunc(h.conf.Path, postHdlr) + gMux.HandleFunc(h.conf.Path, postHdlr) } if len(h.conf.WSPath) > 0 { - mux.HandleFunc(h.conf.WSPath, wsHdlr) + gMux.HandleFunc(h.conf.WSPath, wsHdlr) } } else { if len(h.conf.Path) > 0 { diff --git a/internal/impl/io/input_http_server_test.go b/internal/impl/io/input_http_server_test.go index eeb21479a0..d7a976b85d 100644 --- a/internal/impl/io/input_http_server_test.go +++ b/internal/impl/io/input_http_server_test.go @@ -28,6 +28,7 @@ import ( "github.com/benthosdev/benthos/v4/internal/component/metrics" "github.com/benthosdev/benthos/v4/internal/log" "github.com/benthosdev/benthos/v4/internal/manager" + "github.com/benthosdev/benthos/v4/internal/manager/mock" "github.com/benthosdev/benthos/v4/internal/message" "github.com/benthosdev/benthos/v4/internal/transaction" @@ -419,6 +420,68 @@ func TestHTTPtServerPathParameters(t *testing.T) { assert.Equal(t, "will go on", part.MetaGetStr("mylove")) } +func TestHTTPtServerPathParametersCustomServer(t *testing.T) { + tCtx, done := context.WithTimeout(context.Background(), time.Minute) + defer done() + + freePort, err := getFreePort() + require.NoError(t, err) + + conf := input.NewConfig() + conf.Type = "http_server" + conf.HTTPServer.Address = fmt.Sprintf("0.0.0.0:%v", freePort) + conf.HTTPServer.Path = "/test/{foo}/{bar}" + + server, err := mock.NewManager().NewInput(conf) + require.NoError(t, err) + + defer func() { + server.TriggerStopConsuming() + assert.NoError(t, server.WaitForClose(tCtx)) + }() + + dummyPath := "/test/foo1/bar1" + dummyQuery := url.Values{"mylove": []string{"will go on"}} + serverURL, err := url.Parse(fmt.Sprintf("http://localhost:%v", freePort)) + require.NoError(t, err) + + serverURL.Path = dummyPath + serverURL.RawQuery = dummyQuery.Encode() + + dummyData := []byte("a bunch of jolly leprechauns await") + go func() { + req, cerr := http.NewRequest("POST", serverURL.String(), bytes.NewReader(dummyData)) + require.NoError(t, cerr) + req.Header.Set("Content-Type", "text/plain") + resp, cerr := http.DefaultClient.Do(req) + require.NoError(t, cerr) + defer resp.Body.Close() + }() + + readNextMsg := func() (message.Batch, error) { + var tran message.Transaction + select { + case tran = <-server.TransactionChan(): + require.NoError(t, tran.Ack(tCtx, nil)) + case <-time.After(time.Second): + return nil, errors.New("timed out") + } + return tran.Payload, nil + } + + msg, err := readNextMsg() + require.NoError(t, err) + assert.Equal(t, dummyData, message.GetAllBytes(msg)[0]) + + part := msg.Get(0) + + assert.Equal(t, dummyPath, part.MetaGetStr("http_server_request_path")) + assert.Equal(t, "POST", part.MetaGetStr("http_server_verb")) + assert.Equal(t, "foo1", part.MetaGetStr("foo")) + assert.Equal(t, "bar1", part.MetaGetStr("bar")) + assert.Equal(t, "will go on", part.MetaGetStr("mylove")) +} + func TestHTTPBadRequests(t *testing.T) { t.Parallel() diff --git a/internal/impl/kafka/input_kafka_franz.go b/internal/impl/kafka/input_kafka_franz.go index fcbfb5612c..7504afdf7b 100644 --- a/internal/impl/kafka/input_kafka_franz.go +++ b/internal/impl/kafka/input_kafka_franz.go @@ -21,19 +21,13 @@ import ( func franzKafkaInputConfig() *service.ConfigSpec { return service.NewConfigSpec(). - // Stable(). TODO + Beta(). Categories("Services"). Version("3.61.0"). Summary("An alternative Kafka input using the [Franz Kafka client library](https://github.com/twmb/franz-go)."). Description(` Consumes one or more topics by balancing the partitions across any other connected clients with the same consumer group. -This input is new and experimental, and the existing ` + "`kafka`" + ` input is not going anywhere, but here's some reasons why it might be worth trying this one out: - -- You like shiny new stuff -- You are experiencing issues with the existing ` + "`kafka`" + ` input -- Someone told you to - ### Metadata This input adds the following metadata fields to each message: diff --git a/internal/impl/kafka/input_sarama_kafka.go b/internal/impl/kafka/input_sarama_kafka.go index 1f84ab6494..040ec1b432 100644 --- a/internal/impl/kafka/input_sarama_kafka.go +++ b/internal/impl/kafka/input_sarama_kafka.go @@ -309,7 +309,7 @@ func (k *kafkaReader) asyncCheckpointer(topic string, partition int32) func(cont } resolveFn, err := cp.Track(ctx, offset, int64(msg.Len())) if err != nil { - if err != component.ErrTimeout { + if ctx.Err() == nil && err != component.ErrTimeout { k.log.Errorf("Failed to checkpoint offset: %v\n", err) } return false @@ -324,7 +324,7 @@ func (k *kafkaReader) asyncCheckpointer(topic string, partition int32) func(cont } k.cMut.Lock() if k.session != nil { - k.log.Debugf("Marking offset for topic '%v' partition '%v'.\n", topic, partition) + k.log.Tracef("Marking offset for topic '%v' partition '%v'.\n", topic, partition) k.session.MarkOffset(topic, partition, *maxOffset, "") } else { k.log.Debugf("Unable to mark offset for topic '%v' partition '%v'.\n", topic, partition) diff --git a/internal/impl/kafka/integration_sarama_test.go b/internal/impl/kafka/integration_sarama_test.go index 2fab22f077..5dc8c8df28 100644 --- a/internal/impl/kafka/integration_sarama_test.go +++ b/internal/impl/kafka/integration_sarama_test.go @@ -5,6 +5,7 @@ import ( "fmt" "runtime" "strconv" + "sync" "testing" "time" @@ -18,8 +19,167 @@ import ( "github.com/benthosdev/benthos/v4/internal/integration" "github.com/benthosdev/benthos/v4/internal/manager/mock" "github.com/benthosdev/benthos/v4/internal/message" + "github.com/benthosdev/benthos/v4/public/service" ) +func TestIntegrationSaramaCheckpointOneLockUp(t *testing.T) { + integration.CheckSkipExact(t) + t.Parallel() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + + pool.MaxWait = time.Minute + + kafkaPort, err := integration.GetFreePort() + require.NoError(t, err) + + kafkaPortStr := strconv.Itoa(kafkaPort) + + options := &dockertest.RunOptions{ + Repository: "docker.vectorized.io/vectorized/redpanda", + Tag: "latest", + Hostname: "redpanda", + ExposedPorts: []string{"9092"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}}, + }, + Cmd: []string{ + "redpanda", "start", "--smp 1", "--overprovisioned", "", + "--kafka-addr 0.0.0.0:9092", + fmt.Sprintf("--advertise-kafka-addr localhost:%v", kafkaPort), + }, + } + resource, err := pool.RunWithOptions(options) + require.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + _ = resource.Expire(900) + require.NoError(t, pool.Retry(func() error { + return createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, "wcotesttopic", 20) + })) + + dl, exists := t.Deadline() + if exists { + dl = dl.Add(-time.Second) + } else { + dl = time.Now().Add(time.Minute) + } + testCtx, done := context.WithTimeout(context.Background(), time.Until(dl)) + defer done() + + writeCtx, writeDone := context.WithCancel(testCtx) + defer writeDone() + + // Create data generator stream + inBuilder := service.NewStreamBuilder() + require.NoError(t, inBuilder.AddOutputYAML(fmt.Sprintf(` +kafka: + addresses: [ "localhost:%v" ] + topic: topic-wcotesttopic + max_in_flight: 1 +`, kafkaPortStr))) + + inFunc, err := inBuilder.AddProducerFunc() + require.NoError(t, err) + + inStrm, err := inBuilder.Build() + require.NoError(t, err) + go func() { + assert.NoError(t, inStrm.Run(testCtx)) + }() + + // Create two parallel data consumer streams + var messageCountMut sync.Mutex + var inMessages, outMessagesOne, outMessagesTwo int + + outBuilderConf := fmt.Sprintf(` +kafka: + addresses: [ "localhost:%v" ] + topics: [ topic-wcotesttopic ] + consumer_group: wcotestgroup + checkpoint_limit: 1 + start_from_oldest: true +`, kafkaPortStr) + + outBuilder := service.NewStreamBuilder() + require.NoError(t, outBuilder.AddInputYAML(outBuilderConf)) + require.NoError(t, outBuilder.AddProcessorYAML(`mapping: 'root = content().uppercase()'`)) + require.NoError(t, outBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error { + messageCountMut.Lock() + outMessagesOne++ + messageCountMut.Unlock() + return nil + })) + outStrmOne, err := outBuilder.Build() + require.NoError(t, err) + go func() { + assert.NoError(t, outStrmOne.Run(testCtx)) + }() + + outBuilder = service.NewStreamBuilder() + require.NoError(t, outBuilder.AddInputYAML(outBuilderConf)) + require.NoError(t, outBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error { + messageCountMut.Lock() + outMessagesTwo++ + messageCountMut.Unlock() + return nil + })) + outStrmTwo, err := outBuilder.Build() + require.NoError(t, err) + go func() { + assert.NoError(t, outStrmTwo.Run(testCtx)) + }() + + n := 1000 + go func() { + for { + for i := 0; i < n; i++ { + err := inFunc(writeCtx, service.NewMessage(fmt.Appendf(nil, "hello world %v", i))) + if writeCtx.Err() != nil { + return + } + assert.NoError(t, err) + messageCountMut.Lock() + inMessages++ + messageCountMut.Unlock() + time.Sleep(time.Millisecond * 10) + } + } + }() + + assert.Eventually(t, func() bool { + messageCountMut.Lock() + countOne, countTwo := outMessagesOne, outMessagesTwo + messageCountMut.Unlock() + + t.Logf("count one: %v, count two: %v", countOne, countTwo) + return countOne > 0 && countTwo > 0 + }, time.Until(dl), time.Millisecond*500) + + var prevOne, prevTwo int + assert.Never(t, func() bool { + messageCountMut.Lock() + countOne, countTwo := outMessagesOne, outMessagesTwo + messageCountMut.Unlock() + + hasIncreased := countOne > prevOne && countTwo > prevTwo + prevOne, prevTwo = countOne, countTwo + + t.Logf("count one: %v, count two: %v", countOne, countTwo) + return !hasIncreased + }, time.Until(dl)-time.Second, time.Millisecond*500) + + writeDone() + require.NoError(t, inStrm.Stop(testCtx)) + + require.NoError(t, outStrmOne.Stop(testCtx)) + require.NoError(t, outStrmTwo.Stop(testCtx)) + done() +} + func TestIntegrationSaramaRedpanda(t *testing.T) { integration.CheckSkip(t) t.Parallel() @@ -57,7 +217,7 @@ func TestIntegrationSaramaRedpanda(t *testing.T) { _ = resource.Expire(900) require.NoError(t, pool.Retry(func() error { - return createKafkaTopic("localhost:"+kafkaPortStr, "pls_ignore_just_testing_connection", 1) + return createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, "pls_ignore_just_testing_connection", 1) })) template := ` @@ -109,7 +269,7 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptVarTwo("1"), @@ -122,7 +282,7 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 1)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 1)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptVarTwo("1"), @@ -136,7 +296,7 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptVarTwo("1000"), @@ -150,7 +310,7 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptVarTwo("1"), @@ -167,7 +327,7 @@ input: vars.Var4 = "group" + testID topicName := "topic-" + testID vars.Var1 = fmt.Sprintf(":0,%v:1,%v:2,%v:3", topicName, topicName, topicName) - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptSleepAfterInput(time.Second*3), @@ -181,7 +341,7 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptSleepAfterInput(time.Second*3), @@ -197,7 +357,7 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 1)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 1)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptSleepAfterInput(time.Second*3), @@ -213,7 +373,7 @@ input: suite.Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptSleepAfterInput(time.Second*3), @@ -254,7 +414,7 @@ input: t, templateManualPartitioner, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), integration.StreamTestOptVarTwo("1"), @@ -417,7 +577,7 @@ input: suite.Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - require.NoError(t, createKafkaTopic(address, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, address, testID, 4)) }), integration.StreamTestOptVarOne(""), integration.StreamTestOptVarTwo("1"), @@ -429,7 +589,7 @@ input: suite.Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - require.NoError(t, createKafkaTopic(address, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, address, testID, 4)) }), integration.StreamTestOptVarOne(""), integration.StreamTestOptVarTwo("1000"), @@ -453,7 +613,7 @@ input: suiteSingleCheckpointedStream.Run( t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 1)) + require.NoError(t, createKafkaTopic(ctx, "localhost:"+kafkaPortStr, testID, 1)) }), integration.StreamTestOptVarOne(":0"), integration.StreamTestOptVarTwo("1000"), @@ -468,7 +628,7 @@ input: integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { topicName := "topic-" + testID vars.Var1 = fmt.Sprintf(":0,%v:1,%v:2,%v:3", topicName, topicName, topicName) - require.NoError(t, createKafkaTopic(address, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, address, testID, 4)) }), integration.StreamTestOptSleepAfterInput(time.Second*3), integration.StreamTestOptVarTwo("1"), @@ -482,7 +642,7 @@ input: integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { topicName := "topic-" + testID vars.Var1 = fmt.Sprintf(":0,%v:1,%v:2,%v:3", topicName, topicName, topicName) - require.NoError(t, createKafkaTopic(address, testID, 4)) + require.NoError(t, createKafkaTopic(ctx, address, testID, 4)) }), integration.StreamTestOptSleepAfterInput(time.Second*3), integration.StreamTestOptVarTwo("1000"), diff --git a/internal/impl/kafka/integration_test.go b/internal/impl/kafka/integration_test.go index d967815b5c..b652bb9908 100644 --- a/internal/impl/kafka/integration_test.go +++ b/internal/impl/kafka/integration_test.go @@ -21,7 +21,7 @@ import ( "github.com/twmb/franz-go/pkg/sasl/scram" ) -func createKafkaTopic(address, id string, partitions int32) error { +func createKafkaTopic(ctx context.Context, address, id string, partitions int32) error { topicName := fmt.Sprintf("topic-%v", id) cl, err := kgo.NewClient(kgo.SeedBrokers(address)) @@ -37,19 +37,14 @@ func createKafkaTopic(address, id string, partitions int32) error { topicReq.ReplicationFactor = 1 createTopicsReq.Topics = append(createTopicsReq.Topics, topicReq) - res, err := createTopicsReq.RequestWith(context.Background(), cl) + res, err := createTopicsReq.RequestWith(ctx, cl) if err != nil { return err } if len(res.Topics) != 1 { return fmt.Errorf("expected one topic in response, saw %d", len(res.Topics)) } - t := res.Topics[0] - - if err := kerr.ErrorForCode(t.ErrorCode); err != nil { - return fmt.Errorf("topic creation failure: %w", err) - } - return nil + return kerr.ErrorForCode(res.Topics[0].ErrorCode) } func TestIntegrationKafka(t *testing.T) { @@ -88,7 +83,7 @@ func TestIntegrationKafka(t *testing.T) { _ = resource.Expire(900) require.NoError(t, pool.Retry(func() error { - return createKafkaTopic("localhost:"+kafkaPortStr, "testingconnection", 1) + return createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, "testingconnection", 1) })) template := ` @@ -127,10 +122,43 @@ input: t, template, integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { vars.Var4 = "group" + testID - require.NoError(t, createKafkaTopic("localhost:"+kafkaPortStr, testID, 4)) + require.NoError(t, createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, testID, 4)) }), integration.StreamTestOptPort(kafkaPortStr), ) + + manualPartitionTemplate := ` +output: + kafka_franz: + seed_brokers: [ localhost:$PORT ] + topic: topic-$ID + max_in_flight: $MAX_IN_FLIGHT + timeout: "5s" + partitioner: manual + partition: "0" + metadata: + include_patterns: [ .* ] + batching: + count: $OUTPUT_BATCH_COUNT + +input: + kafka_franz: + seed_brokers: [ localhost:$PORT ] + topics: [ topic-$ID$VAR1 ] + consumer_group: "$VAR4" + checkpoint_limit: 100 + commit_period: "1s" +` + t.Run("manual_partitioner", func(t *testing.T) { + suite.Run( + t, manualPartitionTemplate, + integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, testID string, vars *integration.StreamTestConfigVars) { + vars.Var4 = "group" + testID + require.NoError(t, createKafkaTopic(context.Background(), "localhost:"+kafkaPortStr, testID, 1)) + }), + integration.StreamTestOptPort(kafkaPortStr), + ) + }) } func createKafkaTopicSasl(address, id string, partitions int32) error { diff --git a/internal/impl/kafka/output_kafka_franz.go b/internal/impl/kafka/output_kafka_franz.go index 2844850c07..621ade3496 100644 --- a/internal/impl/kafka/output_kafka_franz.go +++ b/internal/impl/kafka/output_kafka_franz.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "math" + "strconv" "strings" "time" @@ -17,19 +18,12 @@ import ( func franzKafkaOutputConfig() *service.ConfigSpec { return service.NewConfigSpec(). - // Stable(). TODO + Beta(). Categories("Services"). Version("3.61.0"). Summary("An alternative Kafka output using the [Franz Kafka client library](https://github.com/twmb/franz-go)."). Description(` -Writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating it back to the input. - -This output is new and experimental, and the existing ` + "`kafka`" + ` input is not going anywhere, but here's some reasons why it might be worth trying this one out: - -- You like shiny new stuff -- You are experiencing issues with the existing ` + "`kafka`" + ` output -- Someone told you to -`). +Writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating it back to the input.`). Field(service.NewStringListField("seed_brokers"). Description("A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses."). Example([]string{"localhost:9092"}). @@ -43,9 +37,14 @@ This output is new and experimental, and the existing ` + "`kafka`" + ` input is "murmur2_hash": "Kafka's default hash algorithm that uses a 32-bit murmur2 hash of the key to compute which partition the record will be on.", "round_robin": "Round-robin's messages through all available partitions. This algorithm has lower throughput and causes higher CPU load on brokers, but can be useful if you want to ensure an even distribution of records to partitions.", "least_backup": "Chooses the least backed up partition (the partition with the fewest amount of buffered records). Partitions are selected per batch.", + "manual": "Manually select a partition for each message, requires the field `partition` to be specified.", }). Description("Override the default murmur2 hashing partitioner."). Advanced().Optional()). + Field(service.NewInterpolatedStringField("partition"). + Description("An optional explicit partition to set for each message. This field is only relevant when the `partitioner` is set to `manual`. The provided interpolation string must be a valid integer."). + Example(`${! meta("partition") }`). + Optional()). Field(service.NewMetadataFilterField("metadata"). Description("Determine which (if any) metadata values should be added to messages as headers."). Optional()). @@ -68,7 +67,15 @@ This output is new and experimental, and the existing ` + "`kafka`" + ` input is Optional(). Advanced()). Field(service.NewTLSToggledField("tls")). - Field(saslField()) + Field(saslField()). + LintRule(` +root = if this.partitioner == "manual" { + if this.partition.or("") == "" { + "a partition must be specified when the partitioner is set to manual" + } +} else if this.partition.or("") != "" { + "a partition cannot be specified unless the partitioner is set to manual" +}`) } func init() { @@ -100,6 +107,7 @@ type franzKafkaWriter struct { topicStr string topic *service.InterpolatedString key *service.InterpolatedString + partition *service.InterpolatedString tlsConf *tls.Config saslConfs []sasl.Mechanism metaFilter *service.MetadataFilter @@ -137,6 +145,14 @@ func newFranzKafkaWriterFromConfig(conf *service.ParsedConfig, log *service.Logg } } + if conf.Contains("partition") { + if rawStr, _ := conf.FieldString("partition"); rawStr != "" { + if f.partition, err = conf.FieldInterpolatedString("partition"); err != nil { + return nil, err + } + } + } + if f.timeout, err = conf.FieldDuration("timeout"); err != nil { return nil, err } @@ -191,6 +207,8 @@ func newFranzKafkaWriterFromConfig(conf *service.ParsedConfig, log *service.Logg f.partitioner = kgo.RoundRobinPartitioner() case "least_backup": f.partitioner = kgo.LeastBackupPartitioner() + case "manual": + f.partitioner = kgo.ManualPartitioner() default: return nil, fmt.Errorf("unknown partitioner: %v", partStr) } @@ -272,6 +290,17 @@ func (f *franzKafkaWriter) WriteBatch(ctx context.Context, b service.MessageBatc return fmt.Errorf("key interpolation error: %w", err) } } + if f.partition != nil { + partStr, err := b.TryInterpolatedString(i, f.partition) + if err != nil { + return fmt.Errorf("partition interpolation error: %w", err) + } + partInt, err := strconv.Atoi(partStr) + if err != nil { + return fmt.Errorf("partition parse error: %w", err) + } + record.Partition = int32(partInt) + } _ = f.metaFilter.Walk(msg, func(key, value string) error { record.Headers = append(record.Headers, kgo.RecordHeader{ Key: key, diff --git a/internal/impl/kafka/output_kafka_franz_test.go b/internal/impl/kafka/output_kafka_franz_test.go new file mode 100644 index 0000000000..1041dfe1a1 --- /dev/null +++ b/internal/impl/kafka/output_kafka_franz_test.go @@ -0,0 +1,70 @@ +package kafka + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/benthosdev/benthos/v4/public/service" +) + +func TestKafkaFranzOutputBadParams(t *testing.T) { + testCases := []struct { + name string + conf string + errContains string + }{ + { + name: "manual partitioner with a partition", + conf: ` +kafka_franz: + seed_brokers: [ foo:1234 ] + topic: foo + partitioner: manual + partition: '${! meta("foo") }' +`, + }, + { + name: "non manual partitioner without a partition", + conf: ` +kafka_franz: + seed_brokers: [ foo:1234 ] + topic: foo +`, + }, + { + name: "manual partitioner with no partition", + conf: ` +kafka_franz: + seed_brokers: [ foo:1234 ] + topic: foo + partitioner: manual +`, + errContains: "a partition must be specified when the partitioner is set to manual", + }, + { + name: "partition without manual partitioner", + conf: ` +kafka_franz: + seed_brokers: [ foo:1234 ] + topic: foo + partition: '${! meta("foo") }' +`, + errContains: "a partition cannot be specified unless the partitioner is set to manual", + }, + } + + for _, test := range testCases { + test := test + t.Run(test.name, func(t *testing.T) { + err := service.NewStreamBuilder().AddOutputYAML(test.conf) + if test.errContains == "" { + assert.NoError(t, err) + } else { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} diff --git a/internal/impl/pure/cache_memory.go b/internal/impl/pure/cache_memory.go index 4d4c8e36a4..d7509dabce 100644 --- a/internal/impl/pure/cache_memory.go +++ b/internal/impl/pure/cache_memory.go @@ -177,9 +177,8 @@ func (m *memoryCache) getShard(key string) *shard { if len(m.shards) == 1 { return m.shards[0] } - h := xxhash.New64() - _, _ = h.WriteString(key) - return m.shards[h.Sum64()%uint64(len(m.shards))] + + return m.shards[xxhash.ChecksumString64(key)%uint64(len(m.shards))] } func (m *memoryCache) Get(_ context.Context, key string) ([]byte, error) { diff --git a/internal/impl/sql/integration_test.go b/internal/impl/sql/integration_test.go index 66ead49aae..8de70a685a 100644 --- a/internal/impl/sql/integration_test.go +++ b/internal/impl/sql/integration_test.go @@ -545,19 +545,8 @@ func testSuite(t *testing.T, driver, dsn string, createTableFn func(string) erro } } -func TestIntegration(t *testing.T) { +func TestIntegrationClickhouse(t *testing.T) { integration.CheckSkip(t) - - t.Run("clickhouse", clickhouseIntegration) - t.Run("clickhouse_old", clickhouseOldIntegration) - t.Run("postgres", postgresIntegration) - t.Run("mysql", mySQLIntegration) - t.Run("mssql", msSQLIntegration) - t.Run("sqlite", sqliteIntegration) - t.Run("oracle", oracleIntegration) -} - -func clickhouseIntegration(t *testing.T) { t.Parallel() pool, err := dockertest.NewPool("") @@ -611,7 +600,8 @@ func clickhouseIntegration(t *testing.T) { testSuite(t, "clickhouse", dsn, createTable) } -func clickhouseOldIntegration(t *testing.T) { +func TestIntegrationOldClickhouse(t *testing.T) { + integration.CheckSkip(t) t.Parallel() pool, err := dockertest.NewPool("") @@ -665,7 +655,8 @@ func clickhouseOldIntegration(t *testing.T) { testSuite(t, "clickhouse", dsn, createTable) } -func postgresIntegration(t *testing.T) { +func TestIntegrationPostgres(t *testing.T) { + integration.CheckSkip(t) t.Parallel() pool, err := dockertest.NewPool("") @@ -725,7 +716,8 @@ func postgresIntegration(t *testing.T) { testSuite(t, "postgres", dsn, createTable) } -func mySQLIntegration(t *testing.T) { +func TestIntegrationMySQL(t *testing.T) { + integration.CheckSkip(t) t.Parallel() pool, err := dockertest.NewPool("") @@ -788,7 +780,8 @@ func mySQLIntegration(t *testing.T) { testSuite(t, "mysql", dsn, createTable) } -func msSQLIntegration(t *testing.T) { +func TestIntegrationMSSQL(t *testing.T) { + integration.CheckSkip(t) t.Parallel() pool, err := dockertest.NewPool("") @@ -848,7 +841,8 @@ func msSQLIntegration(t *testing.T) { testSuite(t, "mssql", dsn, createTable) } -func sqliteIntegration(t *testing.T) { +func TestIntegrationSQLite(t *testing.T) { + integration.CheckSkip(t) t.Parallel() var db *sql.DB @@ -890,7 +884,8 @@ func sqliteIntegration(t *testing.T) { testSuite(t, "sqlite", dsn, createTable) } -func oracleIntegration(t *testing.T) { +func TestIntegrationOracle(t *testing.T) { + integration.CheckSkip(t) t.Parallel() pool, err := dockertest.NewPool("") diff --git a/internal/integration/stream_test_helpers.go b/internal/integration/stream_test_helpers.go index 8aac094eeb..f410e1aedf 100644 --- a/internal/integration/stream_test_helpers.go +++ b/internal/integration/stream_test_helpers.go @@ -36,6 +36,13 @@ func CheckSkip(t testing.TB) { } } +// CheckSkipExact skips a test unless the -run flag specifically targets it. +func CheckSkipExact(t testing.TB) { + if m := flag.Lookup("test.run").Value.String(); m == "" || m != t.Name() { + t.Skipf("Skipping as execution was not requested explicitly using go test -run %v", t.Name()) + } +} + // GetFreePort attempts to get a free port. This involves creating a bind and // then immediately dropping it and so it's ever so slightly flakey. func GetFreePort() (int, error) { diff --git a/website/docs/components/inputs/kafka_franz.md b/website/docs/components/inputs/kafka_franz.md index 7e9da1614f..9d3759dd6f 100644 --- a/website/docs/components/inputs/kafka_franz.md +++ b/website/docs/components/inputs/kafka_franz.md @@ -1,7 +1,7 @@ --- title: kafka_franz type: input -status: experimental +status: beta categories: ["Services"] --- @@ -14,8 +14,8 @@ categories: ["Services"] import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -:::caution EXPERIMENTAL -This component is experimental and therefore subject to change or removal outside of major version releases. +:::caution BETA +This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found. ::: An alternative Kafka input using the [Franz Kafka client library](https://github.com/twmb/franz-go). @@ -71,12 +71,6 @@ input: Consumes one or more topics by balancing the partitions across any other connected clients with the same consumer group. -This input is new and experimental, and the existing `kafka` input is not going anywhere, but here's some reasons why it might be worth trying this one out: - -- You like shiny new stuff -- You are experiencing issues with the existing `kafka` input -- Someone told you to - ### Metadata This input adds the following metadata fields to each message: diff --git a/website/docs/components/outputs/kafka_franz.md b/website/docs/components/outputs/kafka_franz.md index bb1e4c287c..1af5fb6184 100644 --- a/website/docs/components/outputs/kafka_franz.md +++ b/website/docs/components/outputs/kafka_franz.md @@ -1,7 +1,7 @@ --- title: kafka_franz type: output -status: experimental +status: beta categories: ["Services"] --- @@ -14,8 +14,8 @@ categories: ["Services"] import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -:::caution EXPERIMENTAL -This component is experimental and therefore subject to change or removal outside of major version releases. +:::caution BETA +This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found. ::: An alternative Kafka output using the [Franz Kafka client library](https://github.com/twmb/franz-go). @@ -37,6 +37,7 @@ output: seed_brokers: [] topic: "" key: "" + partition: "" metadata: include_prefixes: [] include_patterns: [] @@ -60,6 +61,7 @@ output: topic: "" key: "" partitioner: "" + partition: "" metadata: include_prefixes: [] include_patterns: [] @@ -88,13 +90,6 @@ output: Writes a batch of messages to Kafka brokers and waits for acknowledgement before propagating it back to the input. -This output is new and experimental, and the existing `kafka` input is not going anywhere, but here's some reasons why it might be worth trying this one out: - -- You like shiny new stuff -- You are experiencing issues with the existing `kafka` output -- Someone told you to - - ## Fields ### `seed_brokers` @@ -144,10 +139,25 @@ Type: `string` | Option | Summary | |---|---| | `least_backup` | Chooses the least backed up partition (the partition with the fewest amount of buffered records). Partitions are selected per batch. | +| `manual` | Manually select a partition for each message, requires the field `partition` to be specified. | | `murmur2_hash` | Kafka's default hash algorithm that uses a 32-bit murmur2 hash of the key to compute which partition the record will be on. | | `round_robin` | Round-robin's messages through all available partitions. This algorithm has lower throughput and causes higher CPU load on brokers, but can be useful if you want to ensure an even distribution of records to partitions. | +### `partition` + +An optional explicit partition to set for each message. This field is only relevant when the `partitioner` is set to `manual`. The provided interpolation string must be a valid integer. +This field supports [interpolation functions](/docs/configuration/interpolation#bloblang-queries). + + +Type: `string` + +```yml +# Examples + +partition: ${! meta("partition") } +``` + ### `metadata` Determine which (if any) metadata values should be added to messages as headers. diff --git a/website/docs/guides/bloblang/advanced.md b/website/docs/guides/bloblang/advanced.md index e5dc9acaed..083598ee78 100644 --- a/website/docs/guides/bloblang/advanced.md +++ b/website/docs/guides/bloblang/advanced.md @@ -20,9 +20,7 @@ root.b = this.b.apply("formatting") # Out: {"a":"(foo)","b":"(bar)"} ``` -However, in cases where we wish to provide multiple named parameters to a mapping we can execute them on object literals for the same effect: - -However, we can still use object literals for this purpose. Imagine if we wanted a map that is the exact same as above except the pattern is `[%v]` instead, with the potential for even more patterns in the future. To do that we can pass an object with a field `value` with our target to map and a field `pattern` which allows us to specify the pattern to apply: +However, we can use object literals in order to provide multiple map parameters. Imagine if we wanted a map that is the exact same as above except the pattern is `[%v]` instead, with the potential for even more patterns in the future. To do that we can pass an object with a field `value` with our target to map and a field `pattern` which allows us to specify the pattern to apply: ```coffee map formatting {