Skip to content

Commit

Permalink
fix(kinesis-output): adds new metrics for counting throttled writes
Browse files Browse the repository at this point in the history
  • Loading branch information
cludden committed Sep 18, 2018
1 parent 8c72286 commit e60215e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 15 deletions.
25 changes: 19 additions & 6 deletions lib/output/writer/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ type Kinesis struct {

log log.Modular
stats metrics.Type

mThrottled metrics.StatCounter
mThrottledF metrics.StatCounter
mPartsThrottled metrics.StatCounter
mPartsThrottledF metrics.StatCounter
}

// NewKinesis creates a new Amazon Kinesis writer.Type.
Expand All @@ -115,12 +120,16 @@ func NewKinesis(
}

k := Kinesis{
conf: conf,
log: log.NewModule(".output.kinesis"),
stats: stats,
hashKey: text.NewInterpolatedString(conf.HashKey),
partitionKey: text.NewInterpolatedString(conf.PartitionKey),
streamName: aws.String(conf.Stream),
conf: conf,
log: log.NewModule(".output.kinesis"),
stats: stats,
mPartsThrottled: stats.GetCounter(".output.parts.send.throttled"),
mPartsThrottledF: stats.GetCounter(".output.kinesis.parts.send.throttled"),
mThrottled: stats.GetCounter(".output.send.throttled"),
mThrottledF: stats.GetCounter(".output.kinesis.send.throttled"),
hashKey: text.NewInterpolatedString(conf.HashKey),
partitionKey: text.NewInterpolatedString(conf.PartitionKey),
streamName: aws.String(conf.Stream),
}

var err error
Expand Down Expand Up @@ -272,6 +281,10 @@ func (a *Kinesis) Write(msg types.Message) error {
// if throttling errors detected, pause briefly
l := len(failed)
if l > 0 {
a.mThrottled.Incr(1)
a.mThrottledF.Incr(1)
a.mPartsThrottled.Incr(int64(l))
a.mPartsThrottledF.Incr(int64(l))
a.log.Warnf("scheduling retry of throttled records (%d)\n", l)
if wait == backoff.Stop {
return types.ErrTimeout
Expand Down
41 changes: 32 additions & 9 deletions lib/output/writer/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ import (
"github.com/ory/dockertest"
)

var (
mockStats = metrics.DudType{}
)

var (
mThrottled = mockStats.GetCounter(".output.send.throttled")
mThrottledF = mockStats.GetCounter(".output.kinesis.send.throttled")
mPartsThrottled = mockStats.GetCounter(".output.parts.send.throttled")
mPartsThrottledF = mockStats.GetCounter(".output.kinesis.parts.send.throttled")
)

type mockKinesis struct {
kinesisiface.KinesisAPI
fn func(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
Expand Down Expand Up @@ -174,9 +185,13 @@ func TestKinesisWriteChunkWithThrottling(t *testing.T) {
return &output, nil
},
},
log: log.Noop(),
partitionKey: text.NewInterpolatedString("${!json_field:id}"),
hashKey: text.NewInterpolatedString(""),
mThrottled: mThrottled,
mThrottledF: mThrottledF,
mPartsThrottled: mPartsThrottled,
mPartsThrottledF: mPartsThrottledF,
log: log.Noop(),
partitionKey: text.NewInterpolatedString("${!json_field:id}"),
hashKey: text.NewInterpolatedString(""),
}

msg := message.New(nil)
Expand Down Expand Up @@ -259,9 +274,13 @@ func TestKinesisWriteMessageThrottling(t *testing.T) {
return &output, nil
},
},
log: log.Noop(),
partitionKey: text.NewInterpolatedString("${!json_field:id}"),
hashKey: text.NewInterpolatedString(""),
mThrottled: mThrottled,
mThrottledF: mThrottledF,
mPartsThrottled: mPartsThrottled,
mPartsThrottledF: mPartsThrottledF,
log: log.Noop(),
partitionKey: text.NewInterpolatedString("${!json_field:id}"),
hashKey: text.NewInterpolatedString(""),
}

msg := message.New(nil)
Expand Down Expand Up @@ -301,9 +320,13 @@ func TestKinesisWriteBackoffMaxRetriesExceeded(t *testing.T) {
return &output, nil
},
},
log: log.Noop(),
partitionKey: text.NewInterpolatedString("${!json_field:id}"),
hashKey: text.NewInterpolatedString(""),
mThrottled: mThrottled,
mThrottledF: mThrottledF,
mPartsThrottled: mPartsThrottled,
mPartsThrottledF: mPartsThrottledF,
log: log.Noop(),
partitionKey: text.NewInterpolatedString("${!json_field:id}"),
hashKey: text.NewInterpolatedString(""),
}

msg := message.New(nil)
Expand Down

0 comments on commit e60215e

Please sign in to comment.