Skip to content

Commit

Permalink
Reader uses goavro specific error type structures
Browse files Browse the repository at this point in the history
  • Loading branch information
Karrick S. McDermott committed Mar 27, 2015
1 parent abf705a commit add83b1
Showing 1 changed file with 82 additions and 48 deletions.
130 changes: 82 additions & 48 deletions ocf_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,43 @@ import (
"fmt"
"io"
"io/ioutil"
"strings"
)

// ErrReaderInit is returned when an error is created during Reader
// initialization.
// ErrReaderInit is returned when the encoder encounters an error.
type ErrReaderInit struct {
Message string
Err error
}

func (e *ErrReaderInit) Error() string {
func (e ErrReaderInit) Error() string {
if e.Err == nil {
return "cannot create Reader: " + e.Message
} else if e.Message == "" {
return "cannot create Reader: " + e.Err.Error()
return "cannot build " + e.Message
} else {
return "cannot create Reader: " + e.Message + "; " + e.Err.Error()
return "cannot build " + e.Message + ": " + e.Err.Error()
}
}

func newReaderInitError(a ...interface{}) *ErrReaderInit {
var err error
var format, message string
var ok bool
if len(a) == 0 {
return &ErrReaderInit{"cannot create reader: no reason given", nil}
}
// if last item is error: save it
if err, ok = a[len(a)-1].(error); ok {
a = a[:len(a)-1] // pop it
}
// if items left, first ought to be format string
if len(a) > 0 {
if format, ok = a[0].(string); ok {
a = a[1:] // unshift
message = fmt.Sprintf(format, a...)
}
}
return &ErrReaderInit{message, err}
}

// ErrReaderBlockCount is returned when a reader detects an error
// while attempting to read the block count and block size.
type ErrReaderBlockCount struct {
Expand Down Expand Up @@ -123,42 +140,42 @@ func NewReader(setters ...ReaderSetter) (*Reader, error) {
for _, setter := range setters {
err = setter(fr)
if err != nil {
return nil, &ErrReaderInit{Err: err}
return nil, newReaderInitError(err)
}
}
if fr.r == nil {
return nil, &ErrReaderInit{Message: "must specify io.Reader"}
return nil, newReaderInitError("must specify io.Reader")
}
// read in header information and use it to initialize Reader
magic := make([]byte, 4)
_, err = fr.r.Read(magic)
if err != nil {
return nil, &ErrReaderInit{Message: "cannot read magic number", Err: err}
return nil, newReaderInitError("cannot read magic number", err)
}
if bytes.Compare(magic, []byte(magicBytes)) != 0 {
return nil, &ErrReaderInit{Message: "invalid magic number: " + string(magic)}
}
meta, err := decodeHeaderMetadata(fr.r)
if err != nil {
return nil, &ErrReaderInit{Message: "cannot read header metadata", Err: err}
return nil, newReaderInitError("cannot read header metadata", err)
}
fr.CompressionCodec, err = getHeaderString("avro.codec", meta)
if err != nil {
return nil, &ErrReaderInit{Message: "cannot read header metadata", Err: err}
return nil, newReaderInitError("cannot read header metadata", err)
}
if !IsCompressionCodecSupported(fr.CompressionCodec) {
return nil, &ErrWriterInit{Message: fmt.Sprintf("unsupported codec: %s", fr.CompressionCodec)}
return nil, newReaderInitError("unsupported codec: %s", fr.CompressionCodec)
}
fr.DataSchema, err = getHeaderString("avro.schema", meta)
if err != nil {
return nil, &ErrReaderInit{Message: "cannot read header metadata", Err: err}
return nil, newReaderInitError("cannot read header metadata", err)
}
if fr.dataCodec, err = NewCodec(fr.DataSchema); err != nil {
return nil, &ErrWriterInit{Message: "cannot compile schema", Err: err}
return nil, newReaderInitError("cannot compile schema", err)
}
fr.Sync = make([]byte, syncLength)
if _, err = fr.r.Read(fr.Sync); err != nil {
return nil, &ErrReaderInit{Message: "cannot read sync marker", Err: err}
return nil, newReaderInitError("cannot read sync marker", err)
}
// setup reading pipeline
toDecompress := make(chan *readerBlock)
Expand Down Expand Up @@ -209,8 +226,43 @@ type readerBlock struct {
r io.Reader
}

// ErrReader is returned when the reader encounters an error.
type ErrReader struct {
Message string
Err error
}

func (e ErrReader) Error() string {
if e.Err == nil {
return "cannot read from reader: " + e.Message
} else {
return "cannot read from reader: " + e.Message + ": " + e.Err.Error()
}
}

func newReaderError(a ...interface{}) *ErrReader {
var err error
var format, message string
var ok bool
if len(a) == 0 {
return &ErrReader{"no reason given", nil}
}
// if last item is error: save it
if err, ok = a[len(a)-1].(error); ok {
a = a[:len(a)-1] // pop it
}
// if items left, first ought to be format string
if len(a) > 0 {
if format, ok = a[0].(string); ok {
a = a[1:] // unshift
message = fmt.Sprintf(format, a...)
}
}
return &ErrReader{message, err}
}

func read(fr *Reader, toDecompress chan<- *readerBlock) {
// NOTE: these variables created outside loop to eliminate churn
// NOTE: these variables created outside loop to reduce churn
var lr io.Reader
var bits []byte
sync := make([]byte, syncLength)
Expand All @@ -222,51 +274,33 @@ func read(fr *Reader, toDecompress chan<- *readerBlock) {
for blockCount != 0 {
lr = io.LimitReader(fr.r, int64(blockSize))
if bits, err = ioutil.ReadAll(lr); err != nil {
err = fmt.Errorf("cannot read block: %v", err)
err = newReaderError("cannot read block", err)
break
}
toDecompress <- &readerBlock{datumCount: blockCount, r: bytes.NewReader(bits)}
if _, err = fr.r.Read(sync); err != nil {
err = fmt.Errorf("cannot read sync marker: %v", err)
err = newReaderError("cannot read sync marker", err)
break
}
if bytes.Compare(fr.Sync, sync) != 0 {
err = fmt.Errorf("sync marker mismatch: %#v != %#v", sync, fr.Sync)
err = newReaderError(fmt.Sprintf("sync marker mismatch: %#v != %#v", sync, fr.Sync))
break
}
if blockCount, blockSize, err = readBlockCountAndSize(fr.r); err != nil {
break
}
}
if err != nil {
fr.err = fmt.Errorf("error reading: %v", err)
fr.err = err
}
close(toDecompress)
}

func readBlockCountAndSize(r io.Reader) (blockCount, blockSize int, err error) {

// io.EOF
// io.ErrUnexpectedEOF

bc, err := longCodec.Decode(r)
switch err {
case io.EOF:
// we're done
// log.Printf("io.EOF: error type %T\n", err)
return 0, 0, nil
case nil:
// no error: ignore
default:
// TODO: this could be optimized
if strings.Contains(err.Error(), "EOF") {
// log.Printf("EOF: error type: %T", err)
// if ed, ok := err.(*ErrDecode); ok {
// log.Printf("EOF: error value: %#v", ed.Err)
// log.Printf("EOF: error string: %v", ed.Err)
// }
// we're done
return 0, 0, nil
if err != nil {
if ed, ok := err.(*ErrDecoder); ok && ed.Err.Error() == "EOF" {
return 0, 0, nil // we're done
}
return 0, 0, &ErrReaderBlockCount{err}
}
Expand All @@ -286,14 +320,14 @@ func decompress(fr *Reader, toDecompress <-chan *readerBlock, toDecode chan<- *r
rc = flate.NewReader(block.r)
bits, block.err = ioutil.ReadAll(rc)
if block.err != nil {
block.err = fmt.Errorf("cannot read from deflate: %v", block.err)
block.err = newReaderError("cannot read from deflate", block.err)
toDecode <- block
rc.Close() // ignore any close error
rc.Close() // already have the read error; ignore the close error
continue
}
block.err = rc.Close()
if block.err != nil {
block.err = fmt.Errorf("cannot close deflate: %v", block.err)
block.err = newReaderError("cannot close deflate", block.err)
toDecode <- block
continue
}
Expand All @@ -309,13 +343,13 @@ func decompress(fr *Reader, toDecompress <-chan *readerBlock, toDecode chan<- *r
for block := range toDecompress {
src, block.err = ioutil.ReadAll(block.r)
if block.err != nil {
block.err = fmt.Errorf("cannot read: %v", block.err)
block.err = newReaderError("cannot read", block.err)
toDecode <- block
continue
}
dst, block.err = snappy.Decode(dst, src)
if block.err != nil {
block.err = fmt.Errorf("cannot decompress: %v", block.err)
block.err = newReaderError("cannot decompress", block.err)
toDecode <- block
continue
}
Expand Down

0 comments on commit add83b1

Please sign in to comment.