Skip to content

Commit

Permalink
feat: support local timestamp logic types (hamba#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Jan 17, 2024
1 parent d25c1c8 commit 4ad91f8
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 67 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ linters:
- gocyclo
- goerr113
- gomnd
- gosmopolitan
- ireturn
- nestif
- nlreturn
Expand Down
52 changes: 27 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,33 @@ More examples in the [godoc](https://godoc.org/github.com/hamba/avro/v2).

#### Types Conversions

| Avro | Go Struct | Go Interface |
|-------------------------|--------------------------------------------------------|--------------------------|
| `null` | `nil` | `nil` |
| `boolean` | `bool` | `bool` |
| `bytes` | `[]byte` | `[]byte` |
| `float` | `float32` | `float32` |
| `double` | `float64` | `float64` |
| `long` | `int64`, `uint32`\* | `int64`, `uint32` |
| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` |
| `fixed` | `uint64` | `uint64` |
| `string` | `string` | `string` |
| `array` | `[]T` | `[]any` |
| `enum` | `string` | `string` |
| `fixed` | `[n]byte` | `[n]byte` |
| `map` | `map[string]T{}` | `map[string]any` |
| `record` | `struct` | `map[string]any` |
| `union` | *see below* | *see below* |
| `int.date` | `time.Time` | `time.Time` |
| `int.time-millis` | `time.Duration` | `time.Duration` |
| `long.time-micros` | `time.Duration` | `time.Duration` |
| `long.timestamp-millis` | `time.Time` | `time.Time` |
| `long.timestamp-micros` | `time.Time` | `time.Time` |
| `bytes.decimal` | `*big.Rat` | `*big.Rat` |
| `fixed.decimal` | `*big.Rat` | `*big.Rat` |
| `string.uuid` | `string` | `string` |
| Avro | Go Struct | Go Interface |
|-------------------------------|--------------------------------------------------------|--------------------------|
| `null` | `nil` | `nil` |
| `boolean` | `bool` | `bool` |
| `bytes` | `[]byte` | `[]byte` |
| `float` | `float32` | `float32` |
| `double` | `float64` | `float64` |
| `long` | `int64`, `uint32`\* | `int64`, `uint32` |
| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` |
| `fixed` | `uint64` | `uint64` |
| `string` | `string` | `string` |
| `array` | `[]T` | `[]any` |
| `enum` | `string` | `string` |
| `fixed` | `[n]byte` | `[n]byte` |
| `map` | `map[string]T{}` | `map[string]any` |
| `record` | `struct` | `map[string]any` |
| `union` | *see below* | *see below* |
| `int.date` | `time.Time` | `time.Time` |
| `int.time-millis` | `time.Duration` | `time.Duration` |
| `long.time-micros` | `time.Duration` | `time.Duration` |
| `long.timestamp-millis` | `time.Time` | `time.Time` |
| `long.timestamp-micros` | `time.Time` | `time.Time` |
| `long.local-timestamp-millis` | `time.Time` | `time.Time` |
| `long.local-timestamp-micros` | `time.Time` | `time.Time` |
| `bytes.decimal` | `*big.Rat` | `*big.Rat` |
| `fixed.decimal` | `*big.Rat` | `*big.Rat` |
| `string.uuid` | `string` | `string` |

\* Please note that when the Go type is an unsigned integer care must be taken to ensure that information is not lost
when converting between the Avro type and Go type. For example, storing a *negative* number in Avro of `int = -100`
Expand Down
11 changes: 8 additions & 3 deletions codec_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func genericDecode(schema Schema, r *Reader) any {
return nil
}

// seems generic reader is not compatible with codec
// Generic reader returns a different result from the
// codec in the case of a big.Rat. Handle this.
if rTyp.Type1() == ratType {
dec := obj.(big.Rat)
return &dec
Expand Down Expand Up @@ -69,14 +70,18 @@ func genericReceiver(schema Schema) (unsafe.Pointer, reflect2.Type, error) {
case TimeMicros:
var v time.Duration
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMillis:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMicros:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case LocalTimestampMillis:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case LocalTimestampMicros:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
}
}
var v int64
Expand Down
19 changes: 16 additions & 3 deletions codec_generic_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package avro
import (
"bytes"
"math/big"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -76,6 +75,20 @@ func TestGenericDecode(t *testing.T) {
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC),
wantErr: require.NoError,
},
{
name: "Long Local-Timestamp-Millis",
data: []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B},
schema: `{"type":"long","logicalType":"local-timestamp-millis"}`,
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local),
wantErr: require.NoError,
},
{
name: "Long Local-Timestamp-Micros",
data: []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05},
schema: `{"type":"long","logicalType":"local-timestamp-micros"}`,
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local),
wantErr: require.NoError,
},
{
name: "Float",
data: []byte{0x33, 0x33, 0x93, 0x3F},
Expand Down Expand Up @@ -197,9 +210,9 @@ func TestGenericDecode(t *testing.T) {
},
}

for i, test := range tests {
for _, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
schema := MustParse(test.schema)
r := NewReader(bytes.NewReader(test.data), 10)

Expand Down
86 changes: 59 additions & 27 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/modern-go/reflect2"
)

//nolint:maintidx // Splitting this would not make it simpler.
func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
converter := resolveConverter(schema.(*PrimitiveSchema).actual)

Expand Down Expand Up @@ -120,24 +121,29 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
st := schema.Type()
ls := getLogicalSchema(schema)
lt := getLogicalType(schema)
tpy1 := typ.Type1()
Istpy1Time := tpy1.ConvertibleTo(timeType)
Istpy1Rat := tpy1.ConvertibleTo(ratType)
isTime := typ.Type1().ConvertibleTo(timeType)
switch {
case Istpy1Time && st == Int && lt == Date:
case isTime && st == Int && lt == Date:
return &dateCodec{}

case Istpy1Time && st == Long && lt == TimestampMillis:
case isTime && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{
convert: converter.toLong,
}

case Istpy1Time && st == Long && lt == TimestampMicros:
case isTime && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{
convert: converter.toLong,
}

case Istpy1Rat && st == Bytes && lt == Decimal:
case isTime && st == Long && lt == LocalTimestampMillis:
return &timestampMillisCodec{
local: true,
convert: converter.toLong,
}
case isTime && st == Long && lt == LocalTimestampMicros:
return &timestampMicrosCodec{
local: true,
convert: converter.toLong,
}
case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal:
dec := ls.(*DecimalLogicalSchema)
return &bytesDecimalCodec{
prec: dec.Precision(), scale: dec.Scale(),
Expand Down Expand Up @@ -228,13 +234,10 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
switch {
case st == Int && lt == TimeMillis: // time.Duration
return &timeMillisCodec{}

case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{}

case st == Long:
return &longCodec[int64]{}

default:
break
}
Expand All @@ -243,7 +246,6 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
switch schema.Type() {
case Double:
return &float32DoubleCodec{}

case Float:
return &float32Codec{}
}
Expand All @@ -269,24 +271,22 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
case reflect.Struct:
st := schema.Type()
lt := getLogicalType(schema)
tpy1 := typ.Type1()
Istpy1Time := tpy1.ConvertibleTo(timeType)
Istpy1Rat := tpy1.ConvertibleTo(ratType)
isTime := typ.Type1().ConvertibleTo(timeType)
switch {
case Istpy1Time && st == Int && lt == Date:
case isTime && st == Int && lt == Date:
return &dateCodec{}
case Istpy1Time && st == Long && lt == TimestampMillis:
case isTime && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{}

case Istpy1Time && st == Long && lt == TimestampMicros:
case isTime && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{}

case Istpy1Rat && st != Bytes || lt == Decimal:
case isTime && st == Long && lt == LocalTimestampMillis:
return &timestampMillisCodec{local: true}
case isTime && st == Long && lt == LocalTimestampMicros:
return &timestampMicrosCodec{local: true}
case typ.Type1().ConvertibleTo(ratType) && st != Bytes || lt == Decimal:
ls := getLogicalSchema(schema)
dec := ls.(*DecimalLogicalSchema)

return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()}

default:
break
}
Expand Down Expand Up @@ -477,6 +477,7 @@ func (c *dateCodec) Encode(ptr unsafe.Pointer, w *Writer) {
}

type timestampMillisCodec struct {
local bool
convert func(*Reader) int64
}

Expand All @@ -489,15 +490,31 @@ func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
}
sec := i / 1e3
nsec := (i - sec*1e3) * 1e6
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
t := time.Unix(sec, nsec)

if c.local {
// When doing unix time, Go will convert the time from UTC to Local,
// changing the time by the number of seconds in the zone offset.
// Remove those added seconds.
_, offset := t.Zone()
t = t.Add(time.Duration(-1*offset) * time.Second)
*((*time.Time)(ptr)) = t
return
}
*((*time.Time)(ptr)) = t.UTC()
}

func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
t := *((*time.Time)(ptr))
if c.local {
t = t.Local()
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6))
}

type timestampMicrosCodec struct {
local bool
convert func(*Reader) int64
}

Expand All @@ -510,11 +527,26 @@ func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
}
sec := i / 1e6
nsec := (i - sec*1e6) * 1e3
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
t := time.Unix(sec, nsec)

if c.local {
// When doing unix time, Go will convert the time from UTC to Local,
// changing the time by the number of seconds in the zone offset.
// Remove those added seconds.
_, offset := t.Zone()
t = t.Add(time.Duration(-1*offset) * time.Second)
*((*time.Time)(ptr)) = t
return
}
*((*time.Time)(ptr)) = t.UTC()
}

func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
t := *((*time.Time)(ptr))
if c.local {
t = t.Local()
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
w.WriteLong(t.Unix()*1e6 + int64(t.Nanosecond()/1e3))
}

Expand Down
Loading

0 comments on commit 4ad91f8

Please sign in to comment.