Process lines or records in parallel.
This package helps to increase the performance of command line filters, that transform data and where data is read in a line or record oriented fashion.
Note: The order of the input lines is not preserved in the output.
The main type is a parallel.Processor, which reads from an io.Reader, applies a function to each input line (separated by a newline by default) and writes the result to an io.Writer.
The transformation function takes a byte slice and therefore does not assume any specific format, so the input may be plain lines, CSV, newline delimited JSON or similar line oriented formats. The output is just bytes and can again assume any format.
An example for the identity transform:
func Noop(b []byte) ([]byte, error) {
return b, nil
}
We can connect this function to IO and let it run:
p := parallel.NewProcessor(os.Stdin, os.Stdout, Noop)
if err := p.Run(); err != nil {
log.Fatal(err)
}
That's all the setup needed. For details and self contained programs, see examples.
The processor has a few attributes, that can be adjusted prior to running:
p := parallel.NewProcessor(os.Stdin, os.Stdout, parallel.ToTransformerFunc(bytes.ToUpper))
// Adjust processor options.
p.NumWorkers = 4 // number of workers (default to runtime.NumCPU())
p.BatchSize = 10000 // how many records to batch, before sending to a worker
p.RecordSeparator = '\n' // record separator (must be a byte at the moment)
if err := p.Run(); err != nil {
log.Fatal(err)
}
The defaults should work for most cases. Batches are kept in memory, so higher batch sizes will need more memory but will decrease the coordination overhead. Sometimes, a batch size of one can be useful too.
It is possible to parallelize record oriented data, too. There is a record.Processor additionally takes a Split function, that is passed internally to a bufio.Scanner, which will parse the input and will concatenate a number of records into a batch, which is then passed to the conversion function.
The bufio package contains a number of split functions, like ScanWords and others. Originally, we implemented record support for fast XML processing. For that, we added a TagSplitter which can split input on XML tags.
Combining parallel with a fast JSON library, such as jsoniter, one can process up to 100000 JSON documents (of about 1K in size) per second. Here is an example snippet.