Skip to content

Commit

Permalink
Change multipart/parallel defaults for http and lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Feb 24, 2022
1 parent e2b001b commit a4c246c
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 50 deletions.
1 change: 0 additions & 1 deletion config/examples/discord_bot.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ pipeline:
- bloblang: 'root = ""'
- try:
- http:
parallel: true
url: https://api.github.com/repos/Jeffail/benthos/releases/latest
verb: GET
- bloblang: 'root = "The latest release of Benthos is %v: %v".format(this.tag_name, this.html_url)'
Expand Down
1 change: 1 addition & 0 deletions lib/output/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestHTTPClientMultipartEnabled(t *testing.T) {

conf := NewConfig()
conf.Type = TypeHTTPClient
conf.HTTPClient.BatchAsMultipart = true
conf.HTTPClient.URL = ts.URL + "/testpost"

h, err := NewHTTPClient(conf, mock.NewManager(), log.Noop(), metrics.Noop())
Expand Down
4 changes: 2 additions & 2 deletions lib/output/writer/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type HTTPClientConfig struct {
func NewHTTPClientConfig() HTTPClientConfig {
return HTTPClientConfig{
Config: client.NewConfig(),
BatchAsMultipart: true, // TODO: V4 Set false by default.
MaxInFlight: 1, // TODO: Increase this default?
BatchAsMultipart: false,
MaxInFlight: 1, // TODO: Increase this default?
PropagateResponse: false,
Batching: batch.NewPolicyConfig(),
}
Expand Down
9 changes: 3 additions & 6 deletions lib/processor/aws_lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ Invokes an AWS lambda for each message. The contents of the message is the
payload of the request, and the result of the invocation will become the new
contents of the message.`,
Description: `
It is possible to perform requests per message of a batch in parallel by setting
the ` + "`parallel`" + ` flag to ` + "`true`" + `. The ` + "`rate_limit`" + `
field can be used to specify a rate limit [resource](/docs/components/rate_limits/about)
to cap the rate of requests across parallel components service wide.
The ` + "`rate_limit`" + ` field can be used to specify a rate limit [resource](/docs/components/rate_limits/about) to cap the rate of requests across parallel components service wide.
In order to map or encode the payload to a specific request body, and map the
response back into the original payload instead of replacing it entirely, you
Expand Down Expand Up @@ -80,7 +77,7 @@ services. It's also possible to set them explicitly at the component level,
allowing you to transfer data across accounts. You can find out more
[in this document](/docs/guides/cloud/aws).`,
FieldSpecs: docs.FieldSpecs{
docs.FieldCommon("parallel", "Whether messages of a batch should be dispatched in parallel."),
docs.FieldDeprecated("parallel", "Whether messages of a batch should be dispatched in parallel.").HasDefault(true),
}.Merge(client.FieldSpecs()),
Examples: []docs.AnnotatedExample{
{
Expand Down Expand Up @@ -113,7 +110,7 @@ type LambdaConfig struct {
func NewLambdaConfig() LambdaConfig {
return LambdaConfig{
Config: client.NewConfig(),
Parallel: false,
Parallel: true,
}
}

Expand Down
21 changes: 10 additions & 11 deletions lib/processor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ func init() {
Performs an HTTP request using a message batch as the request body, and replaces
the original message parts with the body of the response.`,
Description: `
If a processed message batch contains more than one message they will be sent in
a single request as a [multipart message](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html).
Alternatively, message batches can be sent in parallel by setting the field
` + "`parallel` to `true`" + `.
The ` + "`rate_limit`" + ` field can be used to specify a rate limit
[resource](/docs/components/rate_limits/about) to cap the rate of requests
across all parallel components service wide.
Expand Down Expand Up @@ -80,7 +75,8 @@ attempt. These failed messages will continue through the pipeline unchanged, but
can be dropped or placed in a dead letter queue according to your config, you
can read about these patterns [here](/docs/configuration/error_handling).`,
config: ihttpdocs.ClientFieldSpec(false,
docs.FieldBool("parallel", "When processing batched messages, whether to send messages of the batch in parallel, otherwise they are sent within a single request.").HasDefault(false)),
docs.FieldBool("batch_as_multipart", "Send message batches as a single request using [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html).").Advanced().HasDefault(false),
docs.FieldBool("parallel", "When processing batched messages, whether to send messages of the batch in parallel, otherwise they are sent serially.").HasDefault(false).Deprecated()),
Examples: []docs.AnnotatedExample{
{
Title: "Branched Request",
Expand All @@ -106,15 +102,17 @@ pipeline:

// HTTPConfig contains configuration fields for the HTTP processor.
type HTTPConfig struct {
Parallel bool `json:"parallel" yaml:"parallel"`
client.Config `json:",inline" yaml:",inline"`
BatchAsMultipart bool `json:"batch_as_multipart" yaml:"batch_as_multipart"`
Parallel bool `json:"parallel" yaml:"parallel"`
client.Config `json:",inline" yaml:",inline"`
}

// NewHTTPConfig returns a HTTPConfig with default values.
func NewHTTPConfig() HTTPConfig {
return HTTPConfig{
Parallel: false,
Config: client.NewConfig(),
BatchAsMultipart: false,
Parallel: false,
Config: client.NewConfig(),
}
}

Expand All @@ -131,8 +129,9 @@ func newHTTPProc(conf HTTPConfig, mgr interop.Manager) (processor.V2Batched, err
g := &httpProc{
rawURL: conf.URL,
log: mgr.Logger(),
parallel: conf.Parallel,
parallel: conf.Parallel || !conf.BatchAsMultipart,
}

var err error
if g.client, err = http.NewClient(
conf.Config,
Expand Down
1 change: 0 additions & 1 deletion website/cookbooks/discord_bot.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ pipeline:
- bloblang: 'root = ""'
- try:
- http:
parallel: true
url: https://api.github.com/repos/Jeffail/benthos/releases/latest
verb: GET
- bloblang: 'root = "The latest release of Benthos is %v: %v".format(this.tag_name, this.html_url)'
Expand Down
4 changes: 0 additions & 4 deletions website/cookbooks/enrichments.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ pipeline:
request_map: 'root.text = this.article.content'
processors:
- http:
parallel: true
url: http://localhost:4197/claims
verb: POST
result_map: 'root.tmp.claims = this.claims'
Expand Down Expand Up @@ -201,7 +200,6 @@ pipeline:
root.hyperbole_rank = this.tmp.hyperbole_rank
processors:
- http:
parallel: true
url: http://localhost:4199/fakenews
verb: POST
result_map: 'root.article.fake_news_score = this.fake_news_rank'
Expand Down Expand Up @@ -266,7 +264,6 @@ pipeline:
request_map: 'root.text = this.article.content'
processors:
- http:
parallel: true
url: http://localhost:4197/claims
verb: POST
result_map: 'root.tmp.claims = this.claims'
Expand All @@ -290,7 +287,6 @@ pipeline:
root.hyperbole_rank = this.tmp.hyperbole_rank
processors:
- http:
parallel: true
url: http://localhost:4199/fakenews
verb: POST
result_map: 'root.article.fake_news_score = this.fake_news_rank'
Expand Down
4 changes: 2 additions & 2 deletions website/docs/components/outputs/http_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ output:
drop_on: []
successful_on: []
proxy_url: ""
batch_as_multipart: true
batch_as_multipart: false
propagate_response: false
max_in_flight: 1
batching:
Expand Down Expand Up @@ -651,7 +651,7 @@ Send message batches as a single request using [RFC1341](https://www.w3.org/Prot


Type: `bool`
Default: `true`
Default: `false`

### `propagate_response`

Expand Down
15 changes: 1 addition & 14 deletions website/docs/components/processors/aws_lambda.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ Introduced in version 3.36.0.
# Common config fields, showing default values
label: ""
aws_lambda:
parallel: false
function: ""
```
Expand All @@ -45,7 +44,6 @@ aws_lambda:
# All config fields, showing default values
label: ""
aws_lambda:
parallel: false
function: ""
rate_limit: ""
region: ""
Expand All @@ -64,10 +62,7 @@ aws_lambda:
</TabItem>
</Tabs>
It is possible to perform requests per message of a batch in parallel by setting
the `parallel` flag to `true`. The `rate_limit`
field can be used to specify a rate limit [resource](/docs/components/rate_limits/about)
to cap the rate of requests across parallel components service wide.
The `rate_limit` field can be used to specify a rate limit [resource](/docs/components/rate_limits/about) to cap the rate of requests across parallel components service wide.

In order to map or encode the payload to a specific request body, and map the
response back into the original payload instead of replacing it entirely, you
Expand Down Expand Up @@ -136,14 +131,6 @@ pipeline:

## Fields

### `parallel`

Whether messages of a batch should be dispatched in parallel.


Type: `bool`
Default: `false`

### `function`

The function to invoke.
Expand Down
12 changes: 3 additions & 9 deletions website/docs/components/processors/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ http:
Content-Type: application/octet-stream
rate_limit: ""
timeout: 5s
parallel: false
```
</TabItem>
Expand Down Expand Up @@ -96,17 +95,12 @@ http:
drop_on: []
successful_on: []
proxy_url: ""
parallel: false
batch_as_multipart: false
```
</TabItem>
</Tabs>
If a processed message batch contains more than one message they will be sent in
a single request as a [multipart message](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html).
Alternatively, message batches can be sent in parallel by setting the field
`parallel` to `true`.

The `rate_limit` field can be used to specify a rate limit
[resource](/docs/components/rate_limits/about) to cap the rate of requests
across all parallel components service wide.
Expand Down Expand Up @@ -670,9 +664,9 @@ An optional HTTP proxy URL.
Type: `string`
Default: `""`

### `parallel`
### `batch_as_multipart`

When processing batched messages, whether to send messages of the batch in parallel, otherwise they are sent within a single request.
Send message batches as a single request using [RFC1341](https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html).


Type: `bool`
Expand Down
8 changes: 8 additions & 0 deletions website/docs/guides/migration/v4/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ https://github.com/Jeffail/benthos/issues/399

In V3 the `pipeline.threads` field defaults to 1. If this field is explicitly set to `0` it will automatically match the number of CPUs on the host machine. In V4 this will change so that the default value of `pipeline.threads` is `-1`, where this value indicates we should match the number of host CPUs. A value of `0` will now throw a configuration error when set explicitly.

## `http` processor and `http_client` output parallel by default

The `http` processor and `http_client` output now execute message batch requests as parallel individual requests by default. This behaviour can be disabled by either explicitly sending batches as multipart requests by setting `batch_as_multipart` to `true`, or by placing the processor within a `for_each`.

## `aws_lambda` processor parallel by default

The `aws_lambda` processor now executes message batch requests in parallel. This can be disabled by placing the processor within a `for_each`.

## Processor Batch Behaviour Changes

https://github.com/Jeffail/benthos/issues/408
Expand Down

0 comments on commit a4c246c

Please sign in to comment.