Skip to content

Commit

Permalink
Improvements in the bulk operations implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
niemeyer committed Aug 28, 2015
1 parent e6cf61d commit d1c150d
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 39 deletions.
46 changes: 36 additions & 10 deletions bulk.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package mgo

import (
"gopkg.in/mgo.v2-unstable/bson"
)

// Bulk represents an operation that can be prepared with several
// orthogonal changes before being delivered to the server.
//
Expand All @@ -26,6 +30,8 @@ type bulkAction struct {
docs []interface{}
}

type bulkUpdateOp []interface{}

// BulkError holds an error returned from running a Bulk operation.
//
// TODO: This is private for the moment, until we understand exactly how
Expand Down Expand Up @@ -94,7 +100,17 @@ func (b *Bulk) Update(pairs ...interface{}) {
panic("Bulk.Update requires an even number of parameters")
}
action := b.action(bulkUpdate)
action.docs = append(action.docs, pairs...)
for i := 0; i < len(pairs); i += 2 {
selector := pairs[i]
if selector == nil {
selector = bson.D{}
}
action.docs = append(action.docs, &updateOp{
Collection: b.c.FullName,
Selector: selector,
Update: pairs[i+1],
})
}
}

// UpdateAll queues up the provided pairs of updating instructions.
Expand All @@ -105,8 +121,20 @@ func (b *Bulk) UpdateAll(pairs ...interface{}) {
if len(pairs)%2 != 0 {
panic("Bulk.UpdateAll requires an even number of parameters")
}
action := b.action(bulkUpdateAll)
action.docs = append(action.docs, pairs...)
action := b.action(bulkUpdate)
for i := 0; i < len(pairs); i += 2 {
selector := pairs[i]
if selector == nil {
selector = bson.D{}
}
action.docs = append(action.docs, &updateOp{
Collection: b.c.FullName,
Selector: selector,
Update: pairs[i+1],
Flags: 2,
Multi: true,
})
}
}

// Run runs all the operations queued up.
Expand All @@ -121,9 +149,7 @@ func (b *Bulk) Run() (*BulkResult, error) {
case bulkInsert:
ok = b.runInsert(action, &result, &berr)
case bulkUpdate:
ok = b.runUpdate(action, &result, &berr, 0)
case bulkUpdateAll:
ok = b.runUpdate(action, &result, &berr, 2)
ok = b.runUpdate(action, &result, &berr)
default:
panic("unknown bulk operation")
}
Expand All @@ -145,18 +171,18 @@ func (b *Bulk) runInsert(action *bulkAction, result *BulkResult, berr *bulkError
if !b.ordered {
op.flags = 1 // ContinueOnError
}
_, err := b.c.writeQuery(op)
_, err := b.c.writeOp(op, b.ordered)
if err != nil {
berr.err = err
return false
}
return true
}

func (b *Bulk) runUpdate(action *bulkAction, result *BulkResult, berr *bulkError, flags uint32) bool {
func (b *Bulk) runUpdate(action *bulkAction, result *BulkResult, berr *bulkError) bool {
ok := true
for i := 0; i < len(action.docs); i += 2 {
_, err := b.c.writeQuery(&updateOp{b.c.FullName, action.docs[i], action.docs[i+1], flags})
for _, op := range action.docs {
_, err := b.c.writeOp(op, b.ordered)
if err != nil {
ok = false
berr.err = &bulkError{err}
Expand Down
87 changes: 68 additions & 19 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,7 +2215,7 @@ func IsDup(err error) bool {
// happens while inserting the provided documents, the returned error will
// be of type *LastError.
func (c *Collection) Insert(docs ...interface{}) error {
_, err := c.writeQuery(&insertOp{c.FullName, docs, 0})
_, err := c.writeOp(&insertOp{c.FullName, docs, 0}, true)
return err
}

Expand All @@ -2231,7 +2231,15 @@ func (c *Collection) Insert(docs ...interface{}) error {
// http://www.mongodb.org/display/DOCS/Atomic+Operations
//
func (c *Collection) Update(selector interface{}, update interface{}) error {
lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 0})
if selector == nil {
selector = bson.D{}
}
op := updateOp{
Collection: c.FullName,
Selector: selector,
Update: update,
}
lerr, err := c.writeOp(&op, true)
if err == nil && lerr != nil && !lerr.UpdatedExisting {
return ErrNotFound
}
Expand Down Expand Up @@ -2267,7 +2275,17 @@ type ChangeInfo struct {
// http://www.mongodb.org/display/DOCS/Atomic+Operations
//
func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 2})
if selector == nil {
selector = bson.D{}
}
op := updateOp{
Collection: c.FullName,
Selector: selector,
Update: update,
Flags: 2,
Multi: true,
}
lerr, err := c.writeOp(&op, true)
if err == nil && lerr != nil {
info = &ChangeInfo{Updated: lerr.N}
}
Expand All @@ -2288,7 +2306,17 @@ func (c *Collection) UpdateAll(selector interface{}, update interface{}) (info *
// http://www.mongodb.org/display/DOCS/Atomic+Operations
//
func (c *Collection) Upsert(selector interface{}, update interface{}) (info *ChangeInfo, err error) {
lerr, err := c.writeQuery(&updateOp{c.FullName, selector, update, 1})
if selector == nil {
selector = bson.D{}
}
op := updateOp{
Collection: c.FullName,
Selector: selector,
Update: update,
Flags: 1,
Upsert: true,
}
lerr, err := c.writeOp(&op, true)
if err == nil && lerr != nil {
info = &ChangeInfo{}
if lerr.UpdatedExisting {
Expand Down Expand Up @@ -2320,7 +2348,7 @@ func (c *Collection) UpsertId(id interface{}, update interface{}) (info *ChangeI
// http://www.mongodb.org/display/DOCS/Removing
//
func (c *Collection) Remove(selector interface{}) error {
lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 1})
lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 1}, true)
if err == nil && lerr != nil && lerr.N == 0 {
return ErrNotFound
}
Expand All @@ -2346,7 +2374,7 @@ func (c *Collection) RemoveId(id interface{}) error {
// http://www.mongodb.org/display/DOCS/Removing
//
func (c *Collection) RemoveAll(selector interface{}) (info *ChangeInfo, err error) {
lerr, err := c.writeQuery(&deleteOp{c.FullName, selector, 0})
lerr, err := c.writeOp(&deleteOp{c.FullName, selector, 0}, true)
if err == nil && lerr != nil {
info = &ChangeInfo{Removed: lerr.N}
}
Expand Down Expand Up @@ -4056,14 +4084,13 @@ type writeCmdResult struct {
} `bson:"writeConcernError"`
}

// writeQuery runs the given modifying operation, potentially followed up
// writeOp runs the given modifying operation, potentially followed up
// by a getLastError command in case the session is in safe mode. The
// LastError result is made available in lerr, and if lerr.Err is set it
// will also be returned as err.
func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
func (c *Collection) writeOp(op interface{}, ordered bool) (lerr *LastError, err error) {
s := c.Database.Session
dbname := c.Database.Name
socket, err := s.acquireSocket(dbname == "local")
socket, err := s.acquireSocket(c.Database.Name == "local")
if err != nil {
return nil, err
}
Expand All @@ -4086,7 +4113,7 @@ func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
l = len(all)
}
op.documents = all[i:l]
_, err := c.writeCommand(socket, safeOp, op)
_, err := c.writeOpCommand(socket, safeOp, op, ordered)
if err != nil {
if op.flags&1 != 0 {
if firstErr == nil {
Expand All @@ -4099,9 +4126,27 @@ func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
}
return nil, firstErr
}
return c.writeCommand(socket, safeOp, op)
return c.writeOpCommand(socket, safeOp, op, ordered)
} else if updateOps, ok := op.(bulkUpdateOp); ok {
var firstErr error
for _, updateOp := range updateOps {
_, err := c.writeOpQuery(socket, safeOp, updateOp, ordered)
if err != nil {
if !ordered {
if firstErr == nil {
firstErr = err
}
} else {
return nil, err
}
}
}
return nil, firstErr
}
return c.writeOpQuery(socket, safeOp, op, ordered)
}

func (c *Collection) writeOpQuery(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) {
if safeOp == nil {
return nil, socket.Query(op)
}
Expand All @@ -4111,7 +4156,7 @@ func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
var replyErr error
mutex.Lock()
query := *safeOp // Copy the data.
query.collection = dbname + ".$cmd"
query.collection = c.Database.Name + ".$cmd"
query.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
replyData = docData
replyErr = err
Expand Down Expand Up @@ -4141,7 +4186,7 @@ func (c *Collection) writeQuery(op interface{}) (lerr *LastError, err error) {
return result, nil
}

func (c *Collection) writeCommand(socket *mongoSocket, safeOp *queryOp, op interface{}) (lerr *LastError, err error) {
func (c *Collection) writeOpCommand(socket *mongoSocket, safeOp *queryOp, op interface{}, ordered bool) (lerr *LastError, err error) {
var writeConcern interface{}
if safeOp == nil {
writeConcern = bson.D{{"w", 0}}
Expand All @@ -4161,15 +4206,19 @@ func (c *Collection) writeCommand(socket *mongoSocket, safeOp *queryOp, op inter
}
case *updateOp:
// http://docs.mongodb.org/manual/reference/command/update
selector := op.selector
if selector == nil {
selector = bson.D{}
cmd = bson.D{
{"update", c.Name},
{"updates", []interface{}{op}},
{"writeConcern", writeConcern},
{"ordered", ordered},
}
case bulkUpdateOp:
// http://docs.mongodb.org/manual/reference/command/update
cmd = bson.D{
{"update", c.Name},
{"updates", []bson.D{{{"q", selector}, {"u", op.update}, {"upsert", op.flags&1 != 0}, {"multi", op.flags&2 != 0}}}},
{"updates", op},
{"writeConcern", writeConcern},
//{"ordered", <bool>},
{"ordered", ordered},
}
case *deleteOp:
// http://docs.mongodb.org/manual/reference/command/delete
Expand Down
22 changes: 12 additions & 10 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ type insertOp struct {
}

type updateOp struct {
collection string // "database.collection"
selector interface{}
update interface{}
flags uint32
Collection string `bson:"-"` // "database.collection"
Selector interface{} `bson:"q"`
Update interface{} `bson:"u"`
Flags uint32 `bson:"-"`
Multi bool `bson:"multi,omitempty"`
Upsert bool `bson:"upsert,omitempty"`
}

type deleteOp struct {
Expand Down Expand Up @@ -370,15 +372,15 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
case *updateOp:
buf = addHeader(buf, 2001)
buf = addInt32(buf, 0) // Reserved
buf = addCString(buf, op.collection)
buf = addInt32(buf, int32(op.flags))
debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.selector)
buf, err = addBSON(buf, op.selector)
buf = addCString(buf, op.Collection)
buf = addInt32(buf, int32(op.Flags))
debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.Selector)
buf, err = addBSON(buf, op.Selector)
if err != nil {
return err
}
debugf("Socket %p to %s: serializing update document: %#v", socket, socket.addr, op.update)
buf, err = addBSON(buf, op.update)
debugf("Socket %p to %s: serializing update document: %#v", socket, socket.addr, op.Update)
buf, err = addBSON(buf, op.Update)
if err != nil {
return err
}
Expand Down

0 comments on commit d1c150d

Please sign in to comment.