Skip to content

Commit

Permalink
add parquet.Dictionary.Page method (parquet-go#116)
Browse files Browse the repository at this point in the history
* decouple value readers from page types

* add Values method to dictionaries

* remove parquet.Dictionary.ReadFrom in favor of parquet.Type.ReadDictionary

* replace parquet.Dictionary.WriteTo and parquet.Dictionary.Values methods with parquet.Dictionary.Page

* revert page reader changes

* fix 1.17 build

* pass column index to parquet.Type.NewDictionary

* fix column index when reading fixed-length byte array dictionaries

* fix column index when reading boolean dictionaries on go 1.17

* PR feedback
  • Loading branch information
Achille authored Mar 30, 2022
1 parent 6723479 commit 542d8f0
Show file tree
Hide file tree
Showing 14 changed files with 698 additions and 423 deletions.
5 changes: 3 additions & 2 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,19 @@ func (buf *Buffer) configure(schema *Schema) {

forEachLeafColumnOf(schema, func(leaf leafColumn) {
nullOrdering := nullsGoLast
columnIndex := int(leaf.columnIndex)
columnType := leaf.node.Type()
bufferSize := buf.config.ColumnBufferSize
dictionary := (Dictionary)(nil)
encoding, _ := encodingAndCompressionOf(leaf.node)

if isDictionaryEncoding(encoding) {
bufferSize /= 2
dictionary = columnType.NewDictionary(bufferSize)
dictionary = columnType.NewDictionary(columnIndex, bufferSize)
columnType = dictionary.Type()
}

column := columnType.NewColumnBuffer(int(leaf.columnIndex), bufferSize)
column := columnType.NewColumnBuffer(columnIndex, bufferSize)
switch {
case leaf.maxRepetitionLevel > 0:
column = newRepeatedColumnBuffer(column, leaf.maxRepetitionLevel, leaf.maxDefinitionLevel, nullOrdering)
Expand Down
2 changes: 1 addition & 1 deletion buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestBuffer(t *testing.T) {
typ parquet.Type
}{
{scenario: "plain", typ: test.typ},
{scenario: "indexed", typ: test.typ.NewDictionary(0).Type()},
{scenario: "indexed", typ: test.typ.NewDictionary(0, 0).Type()},
} {
t.Run(config.scenario, func(t *testing.T) {
for _, mod := range [...]struct {
Expand Down
3 changes: 2 additions & 1 deletion column_buffer_go18.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (col *columnBuffer[T]) Write(b []byte) (int, error) {
if (len(b) % sizeof[T]()) != 0 {
return 0, fmt.Errorf("cannot write %s values from input of size %d", col.class.name, len(b))
}
return col.WriteRequired(cast.BytesToSlice[T](b))
n, err := col.WriteRequired(cast.BytesToSlice[T](b))
return sizeof[T]() * n, err
}

func (col *columnBuffer[T]) WriteRequired(values []T) (int, error) {
Expand Down
186 changes: 99 additions & 87 deletions dictionary.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@ type Dictionary interface {
// Returns the min and max values found in the given indexes.
Bounds(indexed []int32) (min, max Value)

// Reads the dictionary from the decoder passed as argument.
//
// The dictionary is cleared prior to loading the values so that its final
// content contains only the entries read from the decoder.
ReadFrom(encoding.Decoder) error

// Writes the dictionary to the encoder passed as argument.
WriteTo(encoding.Encoder) error

// Resets the dictionary to its initial state, removing all values.
Reset()

// Returns a BufferedPage representing the content of the dictionary.
//
// The returned page shares the underlying memory of the buffer, it remains
// valid to use until the dictionary's Reset method is called.
Page() BufferedPage
}

func dictCap(bufferSize, valueItemSize int) int {
Expand All @@ -66,15 +63,41 @@ func dictCap(bufferSize, valueItemSize int) int {
}

type byteArrayDictionary struct {
typ Type
values encoding.ByteArrayList
index map[string]int32
byteArrayPage
typ Type
index map[string]int32
}

func newByteArrayDictionary(typ Type, bufferSize int) *byteArrayDictionary {
func newByteArrayDictionary(typ Type, columnIndex int16, bufferSize int) *byteArrayDictionary {
return &byteArrayDictionary{
typ: typ,
values: encoding.MakeByteArrayList(dictCap(bufferSize, 16)),
typ: typ,
byteArrayPage: byteArrayPage{
values: encoding.MakeByteArrayList(dictCap(bufferSize, 16)),
columnIndex: columnIndex,
},
}
}

func readByteArrayDictionary(typ Type, columnIndex int16, numValues int, decoder encoding.Decoder) (Dictionary, error) {
d := &byteArrayDictionary{
typ: typ,
byteArrayPage: byteArrayPage{
values: encoding.MakeByteArrayList(atLeastOne(numValues)),
columnIndex: columnIndex,
},
}

for {
if d.values.Len() == d.values.Cap() {
d.values.Grow(d.values.Len())
}
_, err := decoder.DecodeByteArray(&d.values)
if err != nil {
if err == io.EOF {
err = nil
}
return d, err
}
}
}

Expand Down Expand Up @@ -141,69 +164,86 @@ func (d *byteArrayDictionary) Bounds(indexes []int32) (min, max Value) {
return min, max
}

func (d *byteArrayDictionary) ReadFrom(decoder encoding.Decoder) error {
d.Reset()
for {
if d.values.Len() == d.values.Cap() {
d.values.Grow(d.values.Len())
}
_, err := decoder.DecodeByteArray(&d.values)
if err != nil {
if err == io.EOF {
err = nil
}
return err
}
}
}

func (d *byteArrayDictionary) WriteTo(encoder encoding.Encoder) error {
if err := encoder.EncodeByteArray(d.values); err != nil {
return fmt.Errorf("writing parquet dictionary of %d binary values: %w", d.Len(), err)
}
return nil
}

func (d *byteArrayDictionary) Reset() {
d.values.Reset()
d.index = nil
}

func (d *byteArrayDictionary) Page() BufferedPage {
return &d.byteArrayPage
}

type fixedLenByteArrayDictionary struct {
typ Type
size int
values []byte
index map[string]int32
fixedLenByteArrayPage
typ Type
index map[string]int32
}

func newFixedLenByteArrayDictionary(typ Type, bufferSize int) *fixedLenByteArrayDictionary {
func newFixedLenByteArrayDictionary(typ Type, columnIndex int16, bufferSize int) *fixedLenByteArrayDictionary {
size := typ.Length()
return &fixedLenByteArrayDictionary{
typ: typ,
size: size,
values: make([]byte, 0, dictCap(bufferSize, size)*size),
typ: typ,
fixedLenByteArrayPage: fixedLenByteArrayPage{
size: size,
data: make([]byte, 0, dictCap(bufferSize, size)*size),
columnIndex: columnIndex,
},
}
}

func readFixedLenByteArrayDictionary(typ Type, columnIndex int16, numValues int, decoder encoding.Decoder) (Dictionary, error) {
size := typ.Length()

d := &fixedLenByteArrayDictionary{
typ: typ,
fixedLenByteArrayPage: fixedLenByteArrayPage{
size: size,
data: make([]byte, 0, atLeastOne(numValues)*size),
columnIndex: columnIndex,
},
}

for {
if len(d.data) == cap(d.data) {
newValues := make([]byte, len(d.data), 2*cap(d.data))
copy(newValues, d.data)
d.data = newValues
}

n, err := decoder.DecodeFixedLenByteArray(d.size, d.data[len(d.data):cap(d.data)])
if n > 0 {
d.data = d.data[:len(d.data)+(n*d.size)]
}

if err == io.EOF {
return d, nil
}
if err != nil {
return nil, fmt.Errorf("reading parquet dictionary of fixed-length binary values of size %d: %w", d.size, err)
}
}

}

func (d *fixedLenByteArrayDictionary) Type() Type { return newIndexedType(d.typ, d) }

func (d *fixedLenByteArrayDictionary) Len() int { return len(d.values) / d.size }
func (d *fixedLenByteArrayDictionary) Len() int { return len(d.data) / d.size }

func (d *fixedLenByteArrayDictionary) Index(i int32) Value {
return makeValueBytes(FixedLenByteArray, d.value(i))
}

func (d *fixedLenByteArrayDictionary) value(i int32) []byte {
return d.values[int(i)*d.size : int(i+1)*d.size]
return d.data[int(i)*d.size : int(i+1)*d.size]
}

func (d *fixedLenByteArrayDictionary) Insert(indexes []int32, values []Value) {
_ = indexes[:len(values)]

if d.index == nil {
d.index = make(map[string]int32, cap(d.values)/d.size)
for i, j := 0, int32(0); i < len(d.values); i += d.size {
d.index[bits.BytesToString(d.values[i:i+d.size])] = j
d.index = make(map[string]int32, cap(d.data)/d.size)
for i, j := 0, int32(0); i < len(d.data); i += d.size {
d.index[bits.BytesToString(d.data[i:i+d.size])] = j
j++
}
}
Expand All @@ -214,9 +254,9 @@ func (d *fixedLenByteArrayDictionary) Insert(indexes []int32, values []Value) {
index, exists := d.index[string(value)]
if !exists {
index = int32(d.Len())
start := len(d.values)
d.values = append(d.values, value...)
d.index[bits.BytesToString(d.values[start:])] = index
start := len(d.data)
d.data = append(d.data, value...)
d.index[bits.BytesToString(d.data[start:])] = index
}

indexes[i] = index
Expand Down Expand Up @@ -250,43 +290,15 @@ func (d *fixedLenByteArrayDictionary) Bounds(indexes []int32) (min, max Value) {
return min, max
}

func (d *fixedLenByteArrayDictionary) ReadFrom(decoder encoding.Decoder) error {
d.Reset()
for {
if len(d.values) == cap(d.values) {
newValues := make([]byte, len(d.values), 2*cap(d.values))
copy(newValues, d.values)
d.values = newValues
}

n, err := decoder.DecodeFixedLenByteArray(d.size, d.values[len(d.values):cap(d.values)])
if n > 0 {
d.values = d.values[:len(d.values)+(n*d.size)]
}

if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("reading parquet dictionary of fixed-length binary values of size %d: %w", d.size, err)
}
return err
}
}
}

func (d *fixedLenByteArrayDictionary) WriteTo(encoder encoding.Encoder) error {
if err := encoder.EncodeFixedLenByteArray(d.size, d.values); err != nil {
return fmt.Errorf("writing parquet dictionary of %d fixed-length binary values of size %d: %w", d.Len(), d.size, err)
}
return nil
}

func (d *fixedLenByteArrayDictionary) Reset() {
d.values = d.values[:0]
d.data = d.data[:0]
d.index = nil
}

func (d *fixedLenByteArrayDictionary) Page() BufferedPage {
return &d.fixedLenByteArrayPage
}

type indexedType struct {
Type
dict Dictionary
Expand Down
Loading

0 comments on commit 542d8f0

Please sign in to comment.