Skip to content

Commit

Permalink
Fixed bulk error improvements for MongoDB <2.6.
Browse files Browse the repository at this point in the history
  • Loading branch information
niemeyer committed Oct 5, 2015
1 parent 6227a67 commit 89612db
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 25 deletions.
9 changes: 6 additions & 3 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func (b *Bulk) Upsert(pairs ...interface{}) {
}

// Run runs all the operations queued up.
//
// If an error is reported on an unordered bulk operation, the error value may
// be an aggregation of all issues observed. As an exception to that, Insert
// operations running on MongoDB versions prior to 2.6 will report the last
// error only due to a limitation in the wire protocol.
func (b *Bulk) Run() (*BulkResult, error) {
var result BulkResult
var berr bulkError
Expand Down Expand Up @@ -242,9 +247,7 @@ func (b *Bulk) runUpdate(action *bulkAction, result *BulkResult, berr *bulkError

func (b *Bulk) checkSuccess(berr *bulkError, lerr *LastError, err error) bool {
if lerr != nil && len(lerr.errors) > 0 {
for _, e := range lerr.errors {
berr.errs = append(berr.errs, &QueryError{Code: e.Code, Message: e.ErrMsg})
}
berr.errs = append(berr.errs, lerr.errors...)
return false
} else if err != nil {
berr.errs = append(berr.errs, err)
Expand Down
16 changes: 14 additions & 2 deletions bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,27 @@ func (s *S) TestBulkError(c *C) {
bulk.Unordered()
bulk.Insert(M{"_id": "dupone"}, M{"_id": "dupone"}, M{"_id": "duptwo"}, M{"_id": "duptwo"})
_, err = bulk.Run()
c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n - .*duplicate.*dupone.*\n - .*duplicate.*duptwo.*\n$")
if s.versionAtLeast(2, 6) {
c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n( - .*duplicate.*\n){2}$")
c.Assert(err, ErrorMatches, "(?s).*dupone.*")
c.Assert(err, ErrorMatches, "(?s).*duptwo.*")
} else {
// Wire protocol query doesn't return all errors.
c.Assert(err, ErrorMatches, ".*duplicate.*")
}
c.Assert(mgo.IsDup(err), Equals, true)

// With mixed errors, present them all.
bulk = coll.Bulk()
bulk.Unordered()
bulk.Insert(M{"_id": 1}, M{"_id": []int{2}})
_, err = bulk.Run()
c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n - .*duplicate.*\n - .*array.*\n$")
if s.versionAtLeast(2, 6) {
c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n - .*duplicate.*\n - .*array.*\n$")
} else {
// Wire protocol query doesn't return all errors.
c.Assert(err, ErrorMatches, ".*array.*")
}
c.Assert(mgo.IsDup(err), Equals, false)
}

Expand Down
48 changes: 28 additions & 20 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2277,7 +2277,7 @@ type LastError struct {
UpsertedId interface{} `bson:"upserted"`

modified int
errors []writeCmdError
errors []error
}

func (err *LastError) Error() string {
Expand Down Expand Up @@ -4203,6 +4203,14 @@ type writeCmdError struct {
ErrMsg string
}

func (r *writeCmdResult) QueryErrors() []error {
var errs []error
for _, err := range r.Errors {
errs = append(errs, &QueryError{Code: err.Code, Message: err.ErrMsg})
}
return errs
}

// writeOp runs the given modifying operation, potentially followed up
// by a getLastError command in case the session is in safe mode. The
// LastError result is made available in lerr, and if lerr.Err is set it
Expand All @@ -4222,7 +4230,7 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
if socket.ServerInfo().MaxWireVersion >= 2 {
// Servers with a more recent write protocol benefit from write commands.
if op, ok := op.(*insertOp); ok && len(op.documents) > 1000 {
var firstErr error
var errors []error
// Maximum batch size is 1000. Must split out in separate operations for compatibility.
all := op.documents
for i := 0; i < len(all); i += 1000 {
Expand All @@ -4231,35 +4239,35 @@ func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err
l = len(all)
}
op.documents = all[i:l]
_, err := c.writeOpCommand(socket, safeOp, op, ordered)
lerr, err := c.writeOpCommand(socket, safeOp, op, ordered)
if err != nil {
if op.flags&1 != 0 {
if firstErr == nil {
firstErr = err
}
} else {
return nil, err
errors = append(errors, lerr.errors...)
if op.flags&1 == 0 {
return &LastError{errors: errors}, err
}
}
}
return nil, firstErr
if len(errors) == 0 {
return nil, nil
}
return &LastError{errors: errors}, errors[0]
}
return c.writeOpCommand(socket, safeOp, op, ordered)
} else if updateOps, ok := op.(bulkUpdateOp); ok {
var firstErr error
var errors []error
for _, updateOp := range updateOps {
_, err := c.writeOpQuery(socket, safeOp, updateOp, ordered)
lerr, err := c.writeOpQuery(socket, safeOp, updateOp, ordered)
if err != nil {
if !ordered {
if firstErr == nil {
firstErr = err
}
} else {
return nil, err
errors = append(errors, lerr.errors...)
if ordered {
return &LastError{errors: errors}, err
}
}
}
return nil, firstErr
if len(errors) == 0 {
return nil, nil
}
return &LastError{errors: errors}, errors[0]
}
return c.writeOpQuery(socket, safeOp, op, ordered)
}
Expand Down Expand Up @@ -4360,7 +4368,7 @@ func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op int
N: result.N,

modified: result.NModified,
errors: result.Errors,
errors: result.QueryErrors(),
}
if len(result.Upserted) > 0 {
lerr.UpsertedId = result.Upserted[0].Id
Expand Down

0 comments on commit 89612db

Please sign in to comment.