Skip to content

Commit

Permalink
bson: Added Encoder and Decoder types for stream encoding/decoding. (g…
Browse files Browse the repository at this point in the history
…lobalsign#127)

* bson: Added Encoder and Decoder types for stream encoding/decoding.

Those types are analog to those found in json and yaml. They allow us to operate on io.Readers/io.Writers instead of raw byte slices. Streams are expected to be sequences of concatenated BSON documents: *.bson files from MongoDB dumps, for example.

* Stream: NewEncoder and NewDecoder now return pointers.

JSON and YAML do that too, so let's be consistent.

* Stream decoder: added checks on document size limits, and panic handler.

Strangely, the BSON spec defines the document size header as a signed int32, but:
- No document can be smaller than 5 bytes (size header + null terminator).
- MongoDB constrains BSON documents to 16 MiB at most.

Therefore, documents whose header doesn't obey those limits are discarded and Decode returns ErrInvalidDocumentSize.

In addition, we're reusing the handleErr panic handler in Decode to protect from unwanted panics in Unmarshal.

* Exported MinDocumentSize and MaxDocumentSize consts.
  • Loading branch information
maxnoel authored and domodwyer committed Apr 6, 2018
1 parent 69bef6a commit b5611a5
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 0 deletions.
90 changes: 90 additions & 0 deletions bson/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package bson

import (
"bytes"
"encoding/binary"
"fmt"
"io"
)

const (
// MinDocumentSize is the size of the smallest possible valid BSON document:
// an int32 size header + 0x00 (end of document).
MinDocumentSize = 5

// MaxDocumentSize is the largest possible size for a BSON document allowed by MongoDB,
// that is, 16 MiB (see https://docs.mongodb.com/manual/reference/limits/).
MaxDocumentSize = 16777216
)

// ErrInvalidDocumentSize is an error returned when a BSON document's header
// contains a size smaller than MinDocumentSize or greater than MaxDocumentSize.
type ErrInvalidDocumentSize struct {
DocumentSize int32
}

func (e ErrInvalidDocumentSize) Error() string {
return fmt.Sprintf("invalid document size %d", e.DocumentSize)
}

// A Decoder reads and decodes BSON values from an input stream.
type Decoder struct {
source io.Reader
}

// NewDecoder returns a new Decoder that reads from source.
// It does not add any extra buffering, and may not read data from source beyond the BSON values requested.
func NewDecoder(source io.Reader) *Decoder {
return &Decoder{source: source}
}

// Decode reads the next BSON-encoded value from its input and stores it in the value pointed to by v.
// See the documentation for Unmarshal for details about the conversion of BSON into a Go value.
func (dec *Decoder) Decode(v interface{}) (err error) {
// BSON documents start with their size as a *signed* int32.
var docSize int32
if err = binary.Read(dec.source, binary.LittleEndian, &docSize); err != nil {
return
}

if docSize < MinDocumentSize || docSize > MaxDocumentSize {
return ErrInvalidDocumentSize{DocumentSize: docSize}
}

docBuffer := bytes.NewBuffer(make([]byte, 0, docSize))
if err = binary.Write(docBuffer, binary.LittleEndian, docSize); err != nil {
return
}

// docSize is the *full* document's size (including the 4-byte size header,
// which has already been read).
if _, err = io.CopyN(docBuffer, dec.source, int64(docSize-4)); err != nil {
return
}

// Let Unmarshal handle the rest.
defer handleErr(&err)
return Unmarshal(docBuffer.Bytes(), v)
}

// An Encoder encodes and writes BSON values to an output stream.
type Encoder struct {
target io.Writer
}

// NewEncoder returns a new Encoder that writes to target.
func NewEncoder(target io.Writer) *Encoder {
return &Encoder{target: target}
}

// Encode encodes v to BSON, and if successful writes it to the Encoder's output stream.
// See the documentation for Marshal for details about the conversion of Go values to BSON.
func (enc *Encoder) Encode(v interface{}) error {
data, err := Marshal(v)
if err != nil {
return err
}

_, err = enc.target.Write(data)
return err
}
77 changes: 77 additions & 0 deletions bson/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package bson_test

import (
"bytes"

"github.com/globalsign/mgo/bson"
. "gopkg.in/check.v1"
)

var invalidSizeDocuments = [][]byte{
// Empty document
[]byte{},
// Incomplete header
[]byte{0x04},
// Negative size
[]byte{0xff, 0xff, 0xff, 0xff},
// Full, valid size header but too small (less than 5 bytes)
[]byte{0x04, 0x00, 0x00, 0x00},
// Valid header, valid size but incomplete document
[]byte{0xff, 0x00, 0x00, 0x00, 0x00},
// Too big
[]byte{0xff, 0xff, 0xff, 0x7f},
}

// Reusing sampleItems from bson_test

func (s *S) TestEncodeSampleItems(c *C) {
for i, item := range sampleItems {
buf := bytes.NewBuffer(nil)
enc := bson.NewEncoder(buf)

err := enc.Encode(item.obj)
c.Assert(err, IsNil)
c.Assert(string(buf.Bytes()), Equals, item.data, Commentf("Failed on item %d", i))
}
}

func (s *S) TestDecodeSampleItems(c *C) {
for i, item := range sampleItems {
buf := bytes.NewBuffer([]byte(item.data))
dec := bson.NewDecoder(buf)

value := bson.M{}
err := dec.Decode(&value)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i))
}
}

func (s *S) TestStreamRoundTrip(c *C) {
buf := bytes.NewBuffer(nil)
enc := bson.NewEncoder(buf)

for _, item := range sampleItems {
err := enc.Encode(item.obj)
c.Assert(err, IsNil)
}

// Ensure that everything that was encoded is decodable in the same order.
dec := bson.NewDecoder(buf)
for i, item := range sampleItems {
value := bson.M{}
err := dec.Decode(&value)
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i))
}
}

func (s *S) TestDecodeDocumentTooSmall(c *C) {
for i, item := range invalidSizeDocuments {
buf := bytes.NewBuffer(item)
dec := bson.NewDecoder(buf)
value := bson.M{}
err := dec.Decode(&value)
c.Assert(err, NotNil, Commentf("Failed on invalid size item %d", i))
}
}

0 comments on commit b5611a5

Please sign in to comment.