Skip to content

Commit

Permalink
goavro.UseCodec() method which allows Writer to be instantiated that …
Browse files Browse the repository at this point in the history
…reuses codec already created
  • Loading branch information
karrick committed Mar 30, 2015
1 parent fc4b02d commit 501bd57
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 50 deletions.
19 changes: 16 additions & 3 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Encoder interface {
type Codec interface {
Decoder
Encoder
Schema() string
}

// CodecSetter functions are those those which are used to modify a
Expand All @@ -111,9 +112,10 @@ type decoderFunction func(io.Reader) (interface{}, error)
type encoderFunction func(io.Writer, interface{}) error

type codec struct {
nm *name
df decoderFunction
ef encoderFunction
nm *name
df decoderFunction
ef encoderFunction
schema string
}

// String returns a string representation of the codec.
Expand Down Expand Up @@ -163,10 +165,16 @@ type symtab map[string]*codec // map full name to codec
// return nil, err
// }
func NewCodec(someJSONSchema string, setters ...CodecSetter) (Codec, error) {
// unmarshal into schema blob
var schema interface{}
if err := json.Unmarshal([]byte(someJSONSchema), &schema); err != nil {
return nil, &ErrSchemaParse{"cannot unmarshal JSON", err}
}
// remarshal back into compressed json
compressedSchema, err := json.Marshal(schema)
if err != nil {
return nil, fmt.Errorf("cannot marshal schema: %v", err)
}

// each codec gets a unified namespace of symbols to
// respective codecs
Expand All @@ -183,6 +191,7 @@ func NewCodec(someJSONSchema string, setters ...CodecSetter) (Codec, error) {
return nil, err
}
}
newCodec.schema = string(compressedSchema)
return newCodec, nil
}

Expand All @@ -200,6 +209,10 @@ func (c codec) Encode(w io.Writer, datum interface{}) error {
return c.ef(w, datum)
}

func (c codec) Schema() string {
return c.schema
}

var (
nullCodec, booleanCodec, intCodec, longCodec, floatCodec, doubleCodec, bytesCodec, stringCodec *codec
)
Expand Down
2 changes: 1 addition & 1 deletion examples/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
}
fh, err := os.Open(arg)
if err != nil {
log.Fatal("cannot open file: ", err)
log.Fatal(err)
}
dumpReader(fh)
fh.Close()
Expand Down
44 changes: 36 additions & 8 deletions examples/file/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package main

import (
"github.com/linkedin/goavro"
"io"
"log"
"os"
)

func main() {
recordSchema := `
const recordSchema = `
{
"type": "record",
"name": "comments",
Expand All @@ -50,16 +50,44 @@ func main() {
]
}
`
fh, err := os.Create("test.avro")

var (
codec goavro.Codec
)

func init() {
var err error
// If you want speed, create the codec one time for each
// schema and reuse it to create multiple Writer instances.
codec, err = goavro.NewCodec(recordSchema)
if err != nil {
log.Fatal("cannot create file: ", err)
log.Fatal(err)
}
defer fh.Close()
}

func main() {
if len(os.Args) > 1 {
for i, arg := range os.Args {
if i == 0 {
continue
}
fh, err := os.Create(arg)
if err != nil {
log.Fatal(err)
}
dumpWriter(fh, codec)
fh.Close()
}
} else {
dumpWriter(os.Stdout, codec)
}
}

func dumpWriter(w io.Writer, codec goavro.Codec) {
fw, err := goavro.NewWriter(
goavro.BlockSize(13), // just because
goavro.Compression(goavro.CompressionSnappy),
goavro.WriterSchema(recordSchema),
goavro.ToWriter(fh))
goavro.ToWriter(w),
goavro.UseCodec(codec))
if err != nil {
log.Fatal(err)
}
Expand Down
22 changes: 18 additions & 4 deletions examples/net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ const recordSchema = `
}
`

var (
codec goavro.Codec
)

func init() {
var err error
// If you want speed, create the codec one time for each
// schema and reuse it to create multiple Writer instances.
codec, err = goavro.NewCodec(recordSchema)
if err != nil {
log.Fatal(err)
}
}

func main() {
ln, err := net.Listen("tcp", ":8080")
if err != nil {
Expand All @@ -60,15 +74,15 @@ func main() {
if err != nil {
log.Fatal(err)
}
go serveClient(conn)
go serveClient(conn, codec)
}
}

func serveClient(conn net.Conn) {
func serveClient(conn net.Conn, codec goavro.Codec) {
fw, err := goavro.NewWriter(
goavro.Compression(goavro.CompressionSnappy),
goavro.WriterSchema(recordSchema),
goavro.ToWriter(conn))
goavro.ToWriter(conn),
goavro.UseCodec(codec))
if err != nil {
log.Fatal(err)
}
Expand Down
49 changes: 16 additions & 33 deletions ocf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"bytes"
"code.google.com/p/snappy-go/snappy"
"compress/flate"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -65,6 +64,16 @@ func ToWriter(w io.Writer) WriterSetter {
}
}

func UseCodec(codec Codec) WriterSetter {
return func(fw *Writer) error {
if codec != nil {
fw.dataCodec = codec
return nil
}
return fmt.Errorf("invalid Codec")
}
}

// BufferToWriter specifies which io.Writer is the target of the
// Writer stream, and creates a bufio.Writer around that
// io.Writer.
Expand Down Expand Up @@ -115,21 +124,17 @@ func Sync(someSync []byte) WriterSetter {

// WriterSchema is used to set the Avro schema of a new instance.
func WriterSchema(someSchema string) WriterSetter {
return func(fw *Writer) error {
var err error
fw.DataSchema = someSchema
fw.dataCodec, err = NewCodec(fw.DataSchema)
if err != nil {
return fmt.Errorf("error compiling schema: %v", err)
return func(fw *Writer) (err error) {
if fw.dataCodec, err = NewCodec(someSchema); err != nil {
return
}
return nil
return
}
}

// Writer structure contains data necessary to write Avro files.
type Writer struct {
CompressionCodec string
DataSchema string
Sync []byte
blockSize int64
buffered bool
Expand Down Expand Up @@ -192,16 +197,9 @@ func NewWriter(setters ...WriterSetter) (*Writer, error) {
if !IsCompressionCodecSupported(fw.CompressionCodec) {
return nil, &ErrWriterInit{Message: fmt.Sprintf("unsupported codec: %s", fw.CompressionCodec)}
}
if fw.DataSchema == "" {
if fw.dataCodec == nil {
return nil, &ErrWriterInit{Message: "missing schema"}
}
fw.DataSchema, err = compressJSON(fw.DataSchema)
if err != nil {
return nil, &ErrWriterInit{Err: err}
}
if fw.dataCodec, err = NewCodec(fw.DataSchema); err != nil {
return nil, &ErrWriterInit{Err: err}
}
if fw.Sync == nil {
// create random sequence of bytes for file sync marker
fw.Sync = make([]byte, syncLength)
Expand Down Expand Up @@ -247,7 +245,7 @@ func (fw *Writer) writeHeader() (err error) {
}
// header metadata
hm := make(map[string]interface{})
hm["avro.schema"] = []byte(fw.DataSchema)
hm["avro.schema"] = []byte(fw.dataCodec.Schema())
hm["avro.codec"] = []byte(fw.CompressionCodec)
if err = metadataCodec.Encode(fw.w, hm); err != nil {
return
Expand All @@ -256,21 +254,6 @@ func (fw *Writer) writeHeader() (err error) {
return
}

func compressJSON(schemaJSON string) (string, error) {
var err error
var compressed []byte
var schema interface{}
// unmarshal into schema blob
if err = json.Unmarshal([]byte(schemaJSON), &schema); err != nil {
return "", fmt.Errorf("cannot unmarshal schema string: %#v: %v", schemaJSON, err)
}
// remarshal back into compressed json
if compressed, err = json.Marshal(schema); err != nil {
return "", fmt.Errorf("cannot marshal schema: %v", err)
}
return string(compressed), nil
}

type writerBlock struct {
items []interface{}
encoded *bytes.Buffer
Expand Down
2 changes: 1 addition & 1 deletion ocf_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestNewWriterBailsMissingWriterSchema(t *testing.T) {

func TestNewWriterBailsInvalidWriterSchema(t *testing.T) {
_, err := NewWriter(WriterSchema("this should not compile"))
checkError(t, err, "compiling schema")
checkError(t, err, "cannot parse schema")
}

func TestNewWriterBailsBadSync(t *testing.T) {
Expand Down

0 comments on commit 501bd57

Please sign in to comment.