Skip to content

Commit

Permalink
Add ocf metadata (hamba#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Feb 28, 2020
1 parent 05616fb commit 0096ff9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
23 changes: 19 additions & 4 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Decoder struct {
reader *avro.Reader
resetReader *bytesx.ResetReader
decoder *avro.Decoder
meta map[string][]byte
sync [16]byte

codec Codec
Expand Down Expand Up @@ -83,11 +84,17 @@ func NewDecoder(r io.Reader) (*Decoder, error) {
reader: reader,
resetReader: decReader,
decoder: avro.NewDecoderForSchema(schema, decReader),
meta: h.Meta,
sync: h.Sync,
codec: codec,
}, nil
}

// Metadata returns the header metadata.
func (d *Decoder) Metadata() map[string][]byte {
return d.meta
}

// HasNext determines if there is another value to read.
func (d *Decoder) HasNext() bool {
if d.count <= 0 {
Expand Down Expand Up @@ -150,6 +157,7 @@ func (d *Decoder) readBlock() int64 {
type encoderConfig struct {
BlockLength int
CodecName CodecName
Metadata map[string][]byte
}

// EncoderFunc represents an configuration function for Encoder
Expand All @@ -169,6 +177,13 @@ func WithCodec(codec CodecName) EncoderFunc {
}
}

// WithMetadata sets the metadata on the encoder header.
func WithMetadata(meta map[string][]byte) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.Metadata = meta
}
}

// Encoder writes Avro container file to an output stream.
type Encoder struct {
writer *avro.Writer
Expand All @@ -192,19 +207,19 @@ func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
cfg := encoderConfig{
BlockLength: 100,
CodecName: Null,
Metadata: map[string][]byte{},
}
for _, opt := range opts {
opt(&cfg)
}

writer := avro.NewWriter(w, 512)

cfg.Metadata[schemaKey] = []byte(schema.String())
cfg.Metadata[codecKey] = []byte(cfg.CodecName)
header := Header{
Magic: magicBytes,
Meta: map[string][]byte{
schemaKey: []byte(schema.String()),
codecKey: []byte(cfg.CodecName),
},
Meta: cfg.Metadata,
}
_, _ = rand.Read(header.Sync[:])
writer.WriteVal(HeaderSchema, header)
Expand Down
17 changes: 17 additions & 0 deletions ocf/ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,23 @@ func TestEncoder_CloseHandlesWriteBlockError(t *testing.T) {
assert.Error(t, err)
}

func TestEncodeDecodeMetadata(t *testing.T) {
buf := &bytes.Buffer{}
enc, _ := ocf.NewEncoder(`"long"`, buf, ocf.WithMetadata(map[string][]byte{
"test": []byte("foo"),
}))

err := enc.Encode(int64(1))
assert.NoError(t, err)

enc.Close()

dec, err := ocf.NewDecoder(buf)
assert.NoError(t, err)

assert.Equal(t, []byte("foo"), dec.Metadata()["test"])
}

type errorWriter struct{}

func (*errorWriter) Write(p []byte) (n int, err error) {
Expand Down

0 comments on commit 0096ff9

Please sign in to comment.