Skip to content

Commit

Permalink
More examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Aug 7, 2018
1 parent ac7756c commit 2ee401c
Showing 1 changed file with 61 additions and 0 deletions.
61 changes: 61 additions & 0 deletions lib/stream/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ e.g. with four pipeline processing threads the pipeline would look like this:
\ Processing Pipeline -> Custom Processor /
\ Processing Pipeline -> Custom Processor /
Message Batches
In Benthos every message is a batch, and it is the configuration of a stream
that determines the size of each batch (usually 1.) Therefore all processors,
including your custom implementations, support batches.
Sometimes your custom processors will require batches of a certain size in order
to function. It is recommended that you perform message batching using the
standard Benthos batch or combine processors to do this, as it will ensure
resiliency through the stream pipeline. For example, you can add a batch
processor to your input layer:
conf := NewConfig()
conf.Input.Type = input.TypeKafka
conf.Input.Kafka.Addresses = []string{"localhost:9092"}
conf.Input.Kafka.Topic = "example_topic_one"
conf.Input.Processors = append(conf.Input.Processors, processor.NewConfig())
conf.Input.Processors[0].Type = processor.TypeBatch
conf.Input.Processors[0].Batch.ByteSize = 10000000 // 10MB
Horizontal Scaling
The standard set of processors of a Benthos stream are stateless and can
Expand All @@ -57,6 +79,45 @@ Kafka, for example, allows you to distribute messages across partitions, which
can either be statically distributed across consumers or, using the
kafka_balanced input type, can be dynamically distributed across consumers.
Vertical Scaling
Vertically scaled message processing can be done in Benthos with parallel
processing pipelines, where the number of threads is configurable in the
pipeline second of a stream configuration. However, in order to saturate those
processing threads your configuration needs one of two things: multiple parallel
inputs or a memory buffer.
Adding a memory buffer is a simple way of scaling a single input consumer across
processing threads, but this removes the automatic delivery guarantees that
Benthos provides.
Instead, it is recommended that you create parallel input sources, the number of
which should at least match the number of processing threads. This retains the
delivery guarantees of your sources and sinks by keeping them tightly coupled
and is done by configuring a broker input type, for example, processing across
four threads with eight parallel consumers:
// Create a Kafka input with automatic partition balancing
inputConf := input.NewConfig()
inputConf.Type = input.TypeKafkaBalanced
inputConf.KafkaBalanced.Addresses = []string{"localhost:9092"}
inputConf.KafkaBalanced.Topics = []string{"example_topic_one"}
// Create a decompression processor (default gzip)
processorConf := processor.NewConfig()
processorConf.Type = processor.TypeDecompress
// Create a stream with four parallel inputs and four processing threads
conf := NewConfig()
conf.Input.Type = input.TypeBroker
conf.Input.Broker.Inputs = append(conf.Input.Broker.Inputs, inputConf)
conf.Input.Broker.Copies = 8
conf.Pipeline.Processors = append(conf.Pipeline.Processors, processorConf)
conf.Pipeline.Threads = 4
Delivery Guarantees
A Benthos stream, without a buffer (the default), guarantees at-least-once
Expand Down

0 comments on commit 2ee401c

Please sign in to comment.