Skip to content

Commit

Permalink
Add support for amqp1 message properties being *string (redpanda-data…
Browse files Browse the repository at this point in the history
…#2318)

* Add support for amqp message properties being *string

Signed-off-by: Andreas Habel <[email protected]>

* Added more header fields and setting to enable them

Signed-off-by: Andreas Habel <[email protected]>

* Reintroduced nil check via reflection.

Signed-off-by: Andreas Habel <[email protected]>

---------

Signed-off-by: Andreas Habel <[email protected]>
Co-authored-by: Andreas Habel <[email protected]>
Co-authored-by: Ashley Jeffs <[email protected]>
  • Loading branch information
3 people authored Jan 26, 2024
1 parent 7ff658d commit 8681b9a
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 6 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ All notable changes to this project will be documented in this file.
### Added

- Field `address_cache` added to the `socket_server` input.
- Field `read_header` added to the `amqp_1` input.
- All inputs with a `codec` field now support a new field `scanner` to replace it. Scanners are more powerful as they are configured in a structured way similar to other component types rather than via a single string field, for more information [check out the scanners page](https://www.benthos.dev/docs/components/scanners/about).
- New `diff` and `patch` Bloblang methods.
- New `processors` processor.
- Field `read_header` added to the `amqp_1` input.

### Fixed

- The `javascript` processor now handles module imports correctly.
- Bloblang `if` statements now provide explicit errors when query expressions resolve to non-boolean values.

### Fixed

- Some metadata fields from the `amqp_1` input were always empty due to type mismatch, this should no longer be the case.
- The `zip` Bloblang method no longer fails when executed without arguments.

### Changed
Expand Down
5 changes: 3 additions & 2 deletions internal/impl/amqp1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ const (
saslPassField = "password"

// Input
sourceAddrField = "source_address"
azureRenewLockField = "azure_renew_lock"
sourceAddrField = "source_address"
azureRenewLockField = "azure_renew_lock"
getMessageHeaderField = "read_header"

// Output
targetAddrField = "target_address"
Expand Down
41 changes: 40 additions & 1 deletion internal/impl/amqp1/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -33,7 +34,17 @@ This input adds the following metadata fields to each message:
- All string typed message annotations
`+"```"+`
You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries).`).
You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries).
By setting `+"`read_header` to `true`"+`, additional message header properties will be added to each message:
`+"``` text"+`
- amqp_durable
- amqp_priority
- amqp_ttl
- amqp_first_acquirer
- amqp_delivery_count
`+"```").
Fields(
service.NewURLField(urlField).
Description("A URL to connect to.").
Expand All @@ -58,6 +69,10 @@ You can access these metadata fields using [function interpolation](/docs/config
Version("3.45.0").
Default(false).
Advanced(),
service.NewBoolField(getMessageHeaderField).
Description("Read additional message header fields into `amqp_*` metadata properties.").
Version("4.25.0").
Default(false).Advanced(),
service.NewTLSToggledField(tlsField),
saslFieldSpec(),
).LintRule(`
Expand All @@ -83,6 +98,7 @@ type amqp1Reader struct {
urls []string
sourceAddr string
renewLock bool
getHeader bool
connOpts *amqp.ConnOptions
log *service.Logger

Expand Down Expand Up @@ -128,6 +144,10 @@ func amqp1ReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (
return nil, err
}

if a.getHeader, err = conf.FieldBool(getMessageHeaderField); err != nil {
return nil, err
}

if err := saslOptFnsFromParsed(conf, a.connOpts); err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,6 +263,14 @@ func (a *amqp1Reader) ReadBatch(ctx context.Context) (service.MessageBatch, serv
amqpSetMetadata(part, "amqp_content_encoding", amqpMsg.Properties.ContentEncoding)
amqpSetMetadata(part, "amqp_creation_time", amqpMsg.Properties.CreationTime)
}
if a.getHeader && amqpMsg.Header != nil {
amqpSetMetadata(part, "amqp_durable", amqpMsg.Header.Durable)
amqpSetMetadata(part, "amqp_priority", amqpMsg.Header.Priority)
amqpSetMetadata(part, "amqp_ttl", amqpMsg.Header.TTL)
amqpSetMetadata(part, "amqp_first_acquirer", amqpMsg.Header.FirstAcquirer)
amqpSetMetadata(part, "amqp_delivery_count", amqpMsg.Header.DeliveryCount)
}

if amqpMsg.Annotations != nil {
for k, v := range amqpMsg.Annotations {
keyStr, keyIsStr := k.(string)
Expand Down Expand Up @@ -470,6 +498,11 @@ func amqpSetMetadata(p *service.Message, k string, v any) {
var metaValue string
metaKey := strings.ReplaceAll(k, "-", "_")

// If v is a pointer, and the pointer is nil, do nothing
if vType := reflect.ValueOf(v); vType.Kind() == reflect.Pointer && vType.IsNil() {
return
}

switch v := v.(type) {
case bool:
metaValue = strconv.FormatBool(v)
Expand All @@ -483,16 +516,22 @@ func amqpSetMetadata(p *service.Message, k string, v any) {
metaValue = strconv.Itoa(int(v))
case int32:
metaValue = strconv.Itoa(int(v))
case uint32:
metaValue = strconv.Itoa(int(v))
case int64:
metaValue = strconv.Itoa(int(v))
case nil:
metaValue = ""
case string:
metaValue = v
case *string:
metaValue = *v
case []byte:
metaValue = string(v)
case time.Time:
metaValue = v.Format(time.RFC3339)
case time.Duration:
metaValue = v.String()
default:
metaValue = ""
}
Expand Down
20 changes: 20 additions & 0 deletions website/docs/components/inputs/amqp_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ input:
urls: [] # No default (optional)
source_address: /foo # No default (required)
azure_renew_lock: false
read_header: false
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -73,6 +74,16 @@ This input adds the following metadata fields to each message:
You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries).
By setting `read_header` to `true`, additional message header properties will be added to each message:

``` text
- amqp_durable
- amqp_priority
- amqp_ttl
- amqp_first_acquirer
- amqp_delivery_count
```

## Fields

### `urls`
Expand Down Expand Up @@ -123,6 +134,15 @@ Type: `bool`
Default: `false`
Requires version 3.45.0 or newer

### `read_header`

Read additional message header fields into `amqp_*` metadata properties.


Type: `bool`
Default: `false`
Requires version 4.25.0 or newer

### `tls`

Custom TLS settings can be used to override system defaults.
Expand Down

0 comments on commit 8681b9a

Please sign in to comment.