Skip to content

Commit

Permalink
refinements on logical type handling
Browse files Browse the repository at this point in the history
* standardized function naming ${type}From${Type}

* factored out getSize function and placed in fixed.go
  • Loading branch information
Karrick S. McDermott committed Aug 22, 2018
1 parent 030f59b commit 05dc57f
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 84 deletions.
20 changes: 10 additions & 10 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,45 +185,45 @@ func newSymbolTable() map[string]*Codec {
typeName: &name{"long.timestamp-millis", nullNamespace},
schemaOriginal: "long",
schemaCanonical: "long",
nativeFromTextual: timeStampMillisToNative(longNativeFromTextual),
nativeFromTextual: nativeFromTimeStampMillis(longNativeFromTextual),
binaryFromNative: timeStampMillisFromNative(longBinaryFromNative),
nativeFromBinary: timeStampMillisToNative(longNativeFromBinary),
nativeFromBinary: nativeFromTimeStampMillis(longNativeFromBinary),
textualFromNative: timeStampMillisFromNative(longTextualFromNative),
},
"long.timestamp-micros": {
typeName: &name{"long.timestamp-micros", nullNamespace},
schemaOriginal: "long",
schemaCanonical: "long",
nativeFromTextual: timeStampMicrosToNative(longNativeFromTextual),
nativeFromTextual: nativeFromTimeStampMicros(longNativeFromTextual),
binaryFromNative: timeStampMicrosFromNative(longBinaryFromNative),
nativeFromBinary: timeStampMicrosToNative(longNativeFromBinary),
nativeFromBinary: nativeFromTimeStampMicros(longNativeFromBinary),
textualFromNative: timeStampMicrosFromNative(longTextualFromNative),
},
"int.time-millis": {
typeName: &name{"int.time-millis", nullNamespace},
schemaOriginal: "int",
schemaCanonical: "int",
nativeFromTextual: timeMillisToNative(intNativeFromTextual),
nativeFromTextual: nativeFromTimeMillis(intNativeFromTextual),
binaryFromNative: timeMillisFromNative(intBinaryFromNative),
nativeFromBinary: timeMillisToNative(intNativeFromBinary),
nativeFromBinary: nativeFromTimeMillis(intNativeFromBinary),
textualFromNative: timeMillisFromNative(intTextualFromNative),
},
"long.time-micros": {
typeName: &name{"long.time-micros", nullNamespace},
schemaOriginal: "long",
schemaCanonical: "long",
nativeFromTextual: timeMicrosToNative(longNativeFromTextual),
nativeFromTextual: nativeFromTimeMicros(longNativeFromTextual),
binaryFromNative: timeMicrosFromNative(longBinaryFromNative),
nativeFromBinary: timeMicrosToNative(longNativeFromBinary),
nativeFromBinary: nativeFromTimeMicros(longNativeFromBinary),
textualFromNative: timeMicrosFromNative(longTextualFromNative),
},
"int.date": {
typeName: &name{"int.date", nullNamespace},
schemaOriginal: "int",
schemaCanonical: "int",
nativeFromTextual: dateToNative(intNativeFromTextual),
nativeFromTextual: nativeFromDate(intNativeFromTextual),
binaryFromNative: dateFromNative(intBinaryFromNative),
nativeFromBinary: dateToNative(intNativeFromBinary),
nativeFromBinary: nativeFromDate(intNativeFromBinary),
textualFromNative: dateFromNative(intTextualFromNative),
},
}
Expand Down
48 changes: 28 additions & 20 deletions fixed.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,9 @@ func makeFixedCodec(st map[string]*Codec, enclosingNamespace string, schemaMap m
if err != nil {
return nil, fmt.Errorf("Fixed ought to have valid name: %s", err)
}
// Fixed type must have size
sizeRaw, ok := schemaMap["size"]
if !ok {
return nil, fmt.Errorf("Fixed %q ought to have size key", c.typeName)
}
var size uint
switch val := sizeRaw.(type) {
case string:
s, err := strconv.ParseUint(val, 10, 0)
if err != nil {
return nil, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", c.typeName, sizeRaw)
}
size = uint(s)
case float64:
if val <= 0 {
return nil, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", c.typeName, sizeRaw)
}
size = uint(val)
default:
return nil, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", c.typeName, sizeRaw)
size, err := sizeFromSchemaMap(c.typeName, schemaMap)
if err != nil {
return nil, err
}

c.nativeFromBinary = func(buf []byte) (interface{}, []byte, error) {
Expand Down Expand Up @@ -101,3 +84,28 @@ func makeFixedCodec(st map[string]*Codec, enclosingNamespace string, schemaMap m

return c, nil
}

func sizeFromSchemaMap(typeName *name, schemaMap map[string]interface{}) (uint, error) {
// Fixed type must have size
sizeRaw, ok := schemaMap["size"]
if !ok {
return 0, fmt.Errorf("Fixed %q ought to have size key", typeName)
}
var size uint
switch val := sizeRaw.(type) {
case string:
s, err := strconv.ParseUint(val, 10, 0)
if err != nil {
return 0, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", typeName, sizeRaw)
}
size = uint(s)
case float64:
if val <= 0 {
return 0, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", typeName, sizeRaw)
}
size = uint(val)
default:
return 0, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", typeName, sizeRaw)
}
return size, nil
}
108 changes: 54 additions & 54 deletions logical_type.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package goavro

import (
"errors"
"fmt"
"math"
"math/big"
"strconv"
"time"
)

Expand All @@ -14,7 +14,7 @@ type fromNativeFn func([]byte, interface{}) ([]byte, error)
//////////////////////////////////////////////////////////////////////////////////////////////
// date logical type - to/from time.Time, time.UTC location
//////////////////////////////////////////////////////////////////////////////////////////////
func dateToNative(fn toNativeFn) toNativeFn {
func nativeFromDate(fn toNativeFn) toNativeFn {
return func(bytes []byte) (interface{}, []byte, error) {
l, b, err := fn(bytes)
if err != nil {
Expand Down Expand Up @@ -47,7 +47,7 @@ func dateFromNative(fn fromNativeFn) fromNativeFn {
//////////////////////////////////////////////////////////////////////////////////////////////
// time-millis logical type - to/from time.Time, time.UTC location
//////////////////////////////////////////////////////////////////////////////////////////////
func timeMillisToNative(fn toNativeFn) toNativeFn {
func nativeFromTimeMillis(fn toNativeFn) toNativeFn {
return func(bytes []byte) (interface{}, []byte, error) {
l, b, err := fn(bytes)
if err != nil {
Expand Down Expand Up @@ -76,7 +76,7 @@ func timeMillisFromNative(fn fromNativeFn) fromNativeFn {
//////////////////////////////////////////////////////////////////////////////////////////////
// time-micros logical type - to/from time.Time, time.UTC location
//////////////////////////////////////////////////////////////////////////////////////////////
func timeMicrosToNative(fn toNativeFn) toNativeFn {
func nativeFromTimeMicros(fn toNativeFn) toNativeFn {
return func(bytes []byte) (interface{}, []byte, error) {
l, b, err := fn(bytes)
if err != nil {
Expand Down Expand Up @@ -105,7 +105,7 @@ func timeMicrosFromNative(fn fromNativeFn) fromNativeFn {
//////////////////////////////////////////////////////////////////////////////////////////////
// timestamp-millis logical type - to/from time.Time, time.UTC location
//////////////////////////////////////////////////////////////////////////////////////////////
func timeStampMillisToNative(fn toNativeFn) toNativeFn {
func nativeFromTimeStampMillis(fn toNativeFn) toNativeFn {
return func(bytes []byte) (interface{}, []byte, error) {
l, b, err := fn(bytes)
if err != nil {
Expand Down Expand Up @@ -135,7 +135,7 @@ func timeStampMillisFromNative(fn fromNativeFn) fromNativeFn {
//////////////////////////////////////////////////////////////////////////////////////////////
// timestamp-micros logical type - to/from time.Time, time.UTC location
//////////////////////////////////////////////////////////////////////////////////////////////
func timeStampMicrosToNative(fn toNativeFn) toNativeFn {
func nativeFromTimeStampMicros(fn toNativeFn) toNativeFn {
return func(bytes []byte) (interface{}, []byte, error) {
l, b, err := fn(bytes)
if err != nil {
Expand Down Expand Up @@ -169,26 +169,56 @@ func timeStampMicrosFromNative(fn fromNativeFn) fromNativeFn {
/////////////////////////////////////////////////////////////////////////////////////////////
type makeCodecFn func(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error)

func precisionAndScaleFromSchemaMap(schemaMap map[string]interface{}) (int, int, error) {
p1, ok := schemaMap["precision"]
if !ok {
return 0, 0, errors.New("cannot create decimal logical type without precision")
}
p2, ok := p1.(float64)
if !ok {
return 0, 0, fmt.Errorf("cannot create decimal logical type with wrong precision type; expected: float64; received: %T", p1)
}
p3 := int(p2)
if p3 <= 1 {
return 0, 0, fmt.Errorf("cannot create decimal logical type when precision is less than one: %d", p3)
}
var s3 int // scale defaults to 0 if not set
if s1, ok := schemaMap["scale"]; ok {
s2, ok := s1.(float64)
if !ok {
return 0, 0, fmt.Errorf("cannot create decimal logical type with wrong precision type; expected: float64; received: %T", p1)
}
s3 = int(s2)
if s3 < 0 {
return 0, 0, fmt.Errorf("cannot create decimal logical type when scale is less than zero: %d", s3)
}
if s3 > p3 {
return 0, 0, fmt.Errorf("cannot create decimal logical type when scale is larger than precision: %d > %d", s3, p3)
}
}
return p3, s3, nil
}

var one = big.NewInt(1)

func makeDecimalBytesCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
precision, scale, err := precisionAndScaleFromSchemaMap(schemaMap)
if err != nil {
return nil, err
}
schemaMap["name"] = "bytes.decimal"
c, err := registerNewCodec(st, schemaMap, enclosingNamespace)
if err != nil {
return nil, fmt.Errorf("Bytes ought to have valid name: %s", err)
}
precision := schemaMap["precision"]
scale := schemaMap["scale"]
p := int(precision.(float64))
s := int(scale.(float64))
c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, p, s)
c.textualFromNative = decimalBytesFromNative(bytesTextualFromNative, toSignedBytes, p, s)
c.nativeFromBinary = decimalBytesToNative(bytesNativeFromBinary, p, s)
c.nativeFromTextual = decimalBytesToNative(bytesNativeFromTextual, p, s)
c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, precision, scale)
c.textualFromNative = decimalBytesFromNative(bytesTextualFromNative, toSignedBytes, precision, scale)
c.nativeFromBinary = nativeFromDecimalBytes(bytesNativeFromBinary, precision, scale)
c.nativeFromTextual = nativeFromDecimalBytes(bytesNativeFromTextual, precision, scale)
return c, nil
}

func decimalBytesToNative(fn toNativeFn, precision, scale int) toNativeFn {
func nativeFromDecimalBytes(fn toNativeFn, precision, scale int) toNativeFn {
return func(bytes []byte) (interface{}, []byte, error) {
d, b, err := fn(bytes)
if err != nil {
Expand All @@ -215,12 +245,6 @@ func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, prec
if !ok {
return nil, fmt.Errorf("cannot transform to bytes, expected *big.Rat, received %T", d)
}
if precision < 0 {
return nil, fmt.Errorf("cannot transform to bytes, expected precision to be greater than 0")
}
if scale < 0 || scale > precision {
return nil, fmt.Errorf("cannot transform to bytes, expected scale to be 0 or scale to be greater than precision")
}
// we reduce accuracy to precision by dividing and multiplying by digit length
num := big.NewInt(0).Set(r.Num())
denom := big.NewInt(0).Set(r.Denom())
Expand All @@ -238,50 +262,26 @@ func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, prec
}

func makeDecimalFixedCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
precision, scale, err := precisionAndScaleFromSchemaMap(schemaMap)
if err != nil {
return nil, err
}
schemaMap["name"] = "fixed.decimal"
c, err := makeFixedCodec(st, enclosingNamespace, schemaMap)
if err != nil {
return nil, err
}
precision := schemaMap["precision"]
scale := schemaMap["scale"]
size, err := getSize(c.typeName, schemaMap)
size, err := sizeFromSchemaMap(c.typeName, schemaMap)
if err != nil {
return nil, err
}
p := int(precision.(float64))
s := int(scale.(float64))
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), p, s)
c.textualFromNative = decimalBytesFromNative(c.textualFromNative, toSignedFixedBytes(size), p, s)
c.nativeFromBinary = decimalBytesToNative(c.nativeFromBinary, p, s)
c.nativeFromTextual = decimalBytesToNative(c.nativeFromTextual, p, s)
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), precision, scale)
c.textualFromNative = decimalBytesFromNative(c.textualFromNative, toSignedFixedBytes(size), precision, scale)
c.nativeFromBinary = nativeFromDecimalBytes(c.nativeFromBinary, precision, scale)
c.nativeFromTextual = nativeFromDecimalBytes(c.nativeFromTextual, precision, scale)
return c, nil
}

func getSize(typeName *name, schemaMap map[string]interface{}) (uint, error) {
// Fixed type must have size
sizeRaw, ok := schemaMap["size"]
if !ok {
return 0, fmt.Errorf("Fixed %q ought to have size key", typeName)
}
var size uint
switch val := sizeRaw.(type) {
case string:
s, err := strconv.ParseUint(val, 10, 0)
if err != nil {
return 0, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", typeName, sizeRaw)
}
size = uint(s)
case float64:
if val <= 0 {
return 0, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", typeName, sizeRaw)
}
size = uint(val)
default:
return 0, fmt.Errorf("Fixed %q size ought to be number greater than zero: %v", typeName, sizeRaw)
}
return size, nil
}
func padBytes(bytes []byte, fixedSize uint) []byte {
s := int(fixedSize)
padded := make([]byte, s, s)
Expand Down
2 changes: 2 additions & 0 deletions logical_type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (

func TestSchemaLogicalType(t *testing.T) {
testSchemaValid(t, `{"type": "long", "logicalType": "timestamp-millis"}`)
testSchemaInvalid(t, `{"type": "bytes", "logicalType": "decimal"}`, "precision")
testSchemaInvalid(t, `{"type": "fixed", "size": 16, "logicalType": "decimal"}`, "precision")
}

func TestTimeStampMillisLogicalTypeEncode(t *testing.T) {
Expand Down

0 comments on commit 05dc57f

Please sign in to comment.