Skip to content

Commit

Permalink
Optimize parquet.MergeRowGroups (#431)
Browse files Browse the repository at this point in the history
* add parquet.MultiRowWriter

* return io.ErrShortWrite if a writer does not write all the rows

* add parquet.RowBuffer[T]

* benchmark + optimize sort of parquet.RowBuffer

* add tests for ordering of parquet.RowBuffer

* rename: firstIndexOfRepeatedColumn => direct

* fix memory leak occuring when reading rows in chunks

* add parquet.SortingWriter

* use WriteRowGroups instead of CopyRows

* refactor sorting configuration

* add dedupe reader and writer

* implement drop of duplicated rows in sorting writer

* fix gofmt

* fix Go 1.17 compilation

* add merge benchmark based on parquet.RowBuffer to reduce noise

* rewrite merge row groups implementation to use row comparator function

* remove unused parquet.SortFunc APIs

* fix column path

* fix git merge conflict resolution
  • Loading branch information
Achille authored Dec 7, 2022
1 parent 813b06a commit 95f6ec7
Show file tree
Hide file tree
Showing 11 changed files with 327 additions and 560 deletions.
6 changes: 3 additions & 3 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *Column) Encoding() encoding.Encoding { return c.encoding }
func (c *Column) Compression() compress.Codec { return c.compression }

// Path of the column in the parquet schema.
func (c *Column) Path() []string { return c.path }
func (c *Column) Path() []string { return c.path[1:] }

// Name returns the column name.
func (c *Column) Name() string { return c.schema.Name }
Expand Down Expand Up @@ -273,7 +273,7 @@ func (cl *columnLoader) open(file *File, path []string) (*Column, error) {
file: file,
schema: &file.metadata.Schema[cl.schemaIndex],
}
c.path = c.path.append(c.schema.Name)
c.path = columnPath(path).append(c.schema.Name)

cl.schemaIndex++
numChildren := int(c.schema.NumChildren)
Expand Down Expand Up @@ -356,7 +356,7 @@ func (cl *columnLoader) open(file *File, path []string) (*Column, error) {
}

var err error
c.columns[i], err = cl.open(file, path)
c.columns[i], err = cl.open(file, c.path)
if err != nil {
return nil, fmt.Errorf("%s: %w", c.schema.Name, err)
}
Expand Down
112 changes: 59 additions & 53 deletions compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,75 +178,81 @@ func lessBE128(v1, v2 *[16]byte) bool {
}

func compareRowsFuncOf(schema *Schema, sortingColumns []SortingColumn) func(Row, Row) int {
compareFuncs := make([]func(Row, Row) int, 0, len(sortingColumns))
compareFuncs := make([]func(Row, Row) int, len(sortingColumns))
direct := true

for _, column := range schema.Columns() {
leaf, _ := schema.Lookup(column...)
if leaf.MaxRepetitionLevel > 0 {
forEachLeafColumnOf(schema, func(leaf leafColumn) {
if leaf.maxRepetitionLevel > 0 {
direct = false
}

for _, sortingColumn := range sortingColumns {
path1 := columnPath(column)
path2 := columnPath(sortingColumn.Path())

if path1.equal(path2) {
descending := sortingColumn.Descending()
optional := leaf.MaxDefinitionLevel > 0
sortFunc := (func(Row, Row) int)(nil)

if direct && !optional {
// This is an optimization for the common case where rows
// are sorted by non-optional, non-repeated columns.
//
// The sort function can make the assumption that it will
// find the column value at the current column index, and
// does not need to scan the rows looking for values with
// a matching column index.
//
// A second optimization consists in passing the column type
// directly to the sort function instead of an intermediary
// closure, which removes an indirection layer and improves
// throughput by ~20% in BenchmarkSortRowBuffer.
typ := leaf.Node.Type()
if descending {
sortFunc = compareRowsFuncOfIndexDescending(leaf.ColumnIndex, typ)
} else {
sortFunc = compareRowsFuncOfIndexAscending(leaf.ColumnIndex, typ)
}
if sortingIndex := searchSortingColumn(sortingColumns, leaf.path); sortingIndex < len(sortingColumns) {
sortingColumn := sortingColumns[sortingIndex]
descending := sortingColumn.Descending()
optional := leaf.maxDefinitionLevel > 0
sortFunc := (func(Row, Row) int)(nil)

if direct && !optional {
// This is an optimization for the common case where rows
// are sorted by non-optional, non-repeated columns.
//
// The sort function can make the assumption that it will
// find the column value at the current column index, and
// does not need to scan the rows looking for values with
// a matching column index.
//
// A second optimization consists in passing the column type
// directly to the sort function instead of an intermediary
// closure, which removes an indirection layer and improves
// throughput by ~20% in BenchmarkSortRowBuffer.
typ := leaf.node.Type()
if descending {
sortFunc = compareRowsFuncOfIndexDescending(leaf.columnIndex, typ)
} else {
compare := leaf.Node.Type().Compare
sortFunc = compareRowsFuncOfIndexAscending(leaf.columnIndex, typ)
}
} else {
compare := leaf.node.Type().Compare

if descending {
compare = CompareDescending(compare)
}
if descending {
compare = CompareDescending(compare)
}

if optional {
if sortingColumn.NullsFirst() {
compare = CompareNullsFirst(compare)
} else {
compare = CompareNullsLast(compare)
}
if optional {
if sortingColumn.NullsFirst() {
compare = CompareNullsFirst(compare)
} else {
compare = CompareNullsLast(compare)
}

sortFunc = compareRowsFuncOfScan(leaf.ColumnIndex, compare)
}

compareFuncs = append(compareFuncs, sortFunc)
sortFunc = compareRowsFuncOfScan(leaf.columnIndex, compare)
}

compareFuncs[sortingIndex] = sortFunc
}
})

// When some sorting columns were not found on the schema it is possible for
// the list of compare functions to still contain nil values; we compact it
// here to keep only the columns that we found comparators for.
n := 0
for _, f := range compareFuncs {
if f != nil {
compareFuncs[n] = f
n++
}
}

// For the common case where rows are sorted by a single column, we can skip
// looping over the list of sort functions.
switch len(compareFuncs) {
switch n {
case 0:
return compareRowsUnordered
case 1:
return compareFuncs[0]
default:
return compareRowsFuncOfColumns(compareFuncs)
return compareRowsFuncOfColumns(compareFuncs[:n])
}
}

Expand All @@ -265,28 +271,28 @@ func compareRowsFuncOfColumns(compareFuncs []func(Row, Row) int) func(Row, Row)
}

//go:noinline
func compareRowsFuncOfIndexAscending(columnIndex int, typ Type) func(Row, Row) int {
func compareRowsFuncOfIndexAscending(columnIndex int16, typ Type) func(Row, Row) int {
return func(row1, row2 Row) int { return typ.Compare(row1[columnIndex], row2[columnIndex]) }
}

//go:noinline
func compareRowsFuncOfIndexDescending(columnIndex int, typ Type) func(Row, Row) int {
func compareRowsFuncOfIndexDescending(columnIndex int16, typ Type) func(Row, Row) int {
return func(row1, row2 Row) int { return -typ.Compare(row1[columnIndex], row2[columnIndex]) }
}

//go:noinline
func compareRowsFuncOfScan(columnIndex int, compare func(Value, Value) int) func(Row, Row) int {
columnIndexOfValues := ^int16(columnIndex)
func compareRowsFuncOfScan(columnIndex int16, compare func(Value, Value) int) func(Row, Row) int {
columnIndex = ^columnIndex
return func(row1, row2 Row) int {
i1 := 0
i2 := 0

for {
for i1 < len(row1) && row1[i1].columnIndex != columnIndexOfValues {
for i1 < len(row1) && row1[i1].columnIndex != columnIndex {
i1++
}

for i2 < len(row2) && row2[i2].columnIndex != columnIndexOfValues {
for i2 < len(row2) && row2[i2].columnIndex != columnIndex {
i2++
}

Expand Down
5 changes: 1 addition & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,7 @@ func (c *SortingConfig) Apply(options ...SortingOption) {
}

func (c *SortingConfig) ConfigureSorting(config *SortingConfig) {
*config = SortingConfig{
SortingColumns: coalesceSortingColumns(c.SortingColumns, config.SortingColumns),
DropDuplicatedRows: c.DropDuplicatedRows,
}
*config = coalesceSortingConfig(*c, *config)
}

// FileOption is an interface implemented by types that carry configuration
Expand Down
15 changes: 15 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"hash/crc32"
"io"
"sort"
"strings"
"sync"

"github.com/segmentio/encoding/thrift"
Expand Down Expand Up @@ -415,6 +416,20 @@ type fileSortingColumn struct {
func (s *fileSortingColumn) Path() []string { return s.column.Path() }
func (s *fileSortingColumn) Descending() bool { return s.descending }
func (s *fileSortingColumn) NullsFirst() bool { return s.nullsFirst }
func (s *fileSortingColumn) String() string {
b := new(strings.Builder)
if s.nullsFirst {
b.WriteString("nulls_first+")
}
if s.descending {
b.WriteString("descending(")
} else {
b.WriteString("ascending(")
}
b.WriteString(columnPath(s.Path()).String())
b.WriteString(")")
return b.String()
}

type fileColumnChunk struct {
file *File
Expand Down
Loading

0 comments on commit 95f6ec7

Please sign in to comment.