Skip to content

Commit

Permalink
Merge v2-unstable into v2.
Browse files Browse the repository at this point in the history
  • Loading branch information
niemeyer committed Jun 3, 2015
2 parents 01ee097 + 8466119 commit 3f8090a
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 45 deletions.
8 changes: 5 additions & 3 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3829,7 +3829,7 @@ func (q *Query) Apply(change Change, result interface{}) (info *ChangeInfo, err
if doc.LastError.N == 0 {
return nil, ErrNotFound
}
if doc.Value.Kind != 0x0A {
if doc.Value.Kind != 0x0A && result != nil {
err = doc.Value.Unmarshal(result)
if err != nil {
return nil, err
Expand Down Expand Up @@ -4186,8 +4186,10 @@ func (c *Collection) writeCommand(socket *mongoSocket, safeOp *queryOp, op inter
var result writeCmdResult
err = c.Database.run(socket, cmd, &result)
debugf("Write command result: %#v (err=%v)", result, err)
// TODO Should lerr.N be result.NModified on updates?
lerr = &LastError{UpdatedExisting: result.NModified != 0, N: result.N}
lerr = &LastError{
UpdatedExisting: result.N > 0 && len(result.Upserted) == 0,
N: result.N,
}
if len(result.Upserted) > 0 {
lerr.UpsertedId = result.Upserted[0].Id
}
Expand Down
18 changes: 17 additions & 1 deletion session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ func (s *S) TestUpdate(c *C) {
c.Assert(err, IsNil)
}

// No changes is a no-op and shouldn't return an error.
err = coll.Update(M{"k": 42}, M{"$set": M{"n": 42}})
c.Assert(err, IsNil)

err = coll.Update(M{"k": 42}, M{"$inc": M{"n": 1}})
c.Assert(err, IsNil)

Expand Down Expand Up @@ -530,7 +534,12 @@ func (s *S) TestUpdateAll(c *C) {
c.Assert(err, IsNil)
}

info, err := coll.UpdateAll(M{"k": M{"$gt": 42}}, M{"$inc": M{"n": 1}})
// Don't actually modify the documents. Should still report 4 matching updates.
info, err := coll.UpdateAll(M{"k": M{"$gt": 42}}, M{"$unset": M{"missing": 1}})
c.Assert(err, IsNil)
c.Assert(info.Updated, Equals, 4)

info, err = coll.UpdateAll(M{"k": M{"$gt": 42}}, M{"$inc": M{"n": 1}})
c.Assert(err, IsNil)
c.Assert(info.Updated, Equals, 4)

Expand Down Expand Up @@ -899,6 +908,13 @@ func (s *S) TestFindAndModify(c *C) {
c.Assert(info.Removed, Equals, 0)
c.Assert(info.UpsertedId, IsNil)

// A nil result parameter should be acceptable.
info, err = coll.Find(M{"n": 43}).Apply(mgo.Change{Update: M{"$unset": M{"missing": 1}}}, nil)
c.Assert(err, IsNil)
c.Assert(info.Updated, Equals, 1)
c.Assert(info.Removed, Equals, 0)
c.Assert(info.UpsertedId, IsNil)

result = M{}
info, err = coll.Find(M{"n": 43}).Apply(mgo.Change{Update: M{"$inc": M{"n": 1}}, ReturnNew: true}, result)
c.Assert(err, IsNil)
Expand Down
80 changes: 39 additions & 41 deletions txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,66 +382,64 @@ func (r *Runner) ChangeLog(logc *mgo.Collection) {
func (r *Runner) PurgeMissing(collections ...string) error {
type M map[string]interface{}
type S []interface{}
pipeline := []M{
{"$project": M{"_id": 1, "txn-queue": 1}},
{"$unwind": "$txn-queue"},
{"$sort": M{"_id": 1, "txn-queue": 1}},
//{"$group": M{"_id": M{"$substr": S{"$txn-queue", 0, 24}}, "docids": M{"$push": "$_id"}}},
}

type TRef struct {
DocId interface{} "_id"
TxnId string "txn-queue"
type TDoc struct {
Id interface{} "_id"
TxnQueue []string "txn-queue"
}

found := make(map[bson.ObjectId]bool)

sort.Strings(collections)
for _, collection := range collections {
c := r.tc.Database.C(collection)
iter := c.Pipe(pipeline).Iter()
var tref TRef
for iter.Next(&tref) {
txnId := bson.ObjectIdHex(tref.TxnId[:24])
iter := c.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
var tdoc TDoc
for iter.Next(&tdoc) {
for _, txnToken := range tdoc.TxnQueue {
txnId := bson.ObjectIdHex(txnToken[:24])
if found[txnId] {
continue
}
if r.tc.FindId(txnId).One(nil) == nil {
found[txnId] = true
continue
}
logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tdoc.Id, txnId)
err := c.UpdateId(tdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
if err != nil {
return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
}
}
}
if err := iter.Close(); err != nil {
return fmt.Errorf("transaction queue iteration error for %s: %v", collection, err)
}
}

type StashTDoc struct {
Id docKey "_id"
TxnQueue []string "txn-queue"
}

iter := r.sc.Find(nil).Select(bson.M{"_id": 1, "txn-queue": 1}).Iter()
var stdoc StashTDoc
for iter.Next(&stdoc) {
for _, txnToken := range stdoc.TxnQueue {
txnId := bson.ObjectIdHex(txnToken[:24])
if found[txnId] {
continue
}
if r.tc.FindId(txnId).One(nil) == nil {
found[txnId] = true
continue
}
logf("WARNING: purging from document %s/%v the missing transaction id %s", collection, tref.DocId, txnId)
err := c.UpdateId(tref.DocId, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stdoc.Id.C, stdoc.Id.Id, txnId)
err := r.sc.UpdateId(stdoc.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
if err != nil {
return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
}
}
if err := iter.Close(); err != nil {
return fmt.Errorf("transaction queue iteration error for collection %s: %v", collection, err)
}
}

type StashTRef struct {
Id docKey "_id"
TxnId string "txn-queue"
}

iter := r.sc.Pipe(pipeline).Iter()
var stref StashTRef
for iter.Next(&stref) {
txnId := bson.ObjectIdHex(stref.TxnId[:24])
if found[txnId] {
continue
}
if r.tc.FindId(txnId).One(nil) == nil {
found[txnId] = true
continue
}
logf("WARNING: purging from stash document %s/%v the missing transaction id %s", stref.Id.C, stref.Id.Id, txnId)
err := r.sc.UpdateId(stref.Id, M{"$pull": M{"txn-queue": M{"$regex": "^" + txnId.Hex() + "_*"}}})
if err != nil {
return fmt.Errorf("error purging missing transaction %s: %v", txnId.Hex(), err)
}
}
if err := iter.Close(); err != nil {
return fmt.Errorf("transaction stash iteration error: %v", err)
Expand Down
81 changes: 81 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,87 @@ func (s *S) TestTxnQueueStashStressTest(c *C) {
}
}

func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
// This test ensures that PurgeMissing can handle very large
// txn-queue fields. Previous iterations of PurgeMissing would
// trigger a 16MB aggregation pipeline result size limit when run
// against a documents or stashes with large numbers of txn-queue
// entries. PurgeMissing now no longer uses aggregation pipelines
// to work around this limit.

// The pipeline result size limitation was removed from MongoDB in 2.6 so
// this test is only run for older MongoDB version.
build, err := s.session.BuildInfo()
c.Assert(err, IsNil)
if build.VersionAtLeast(2, 6) {
c.Skip("This tests a problem that can only happen with MongoDB < 2.6 ")
}

// Insert a single document to work with.
err = s.accounts.Insert(M{"_id": 0, "balance": 100})
c.Assert(err, IsNil)

ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}

// Generate one successful transaction.
good := bson.NewObjectId()
c.Logf("---- Running ops under transaction %q", good.Hex())
err = s.runner.Run(ops, good, nil)
c.Assert(err, IsNil)

// Generate another transaction which which will go missing.
missing := bson.NewObjectId()
c.Logf("---- Running ops under transaction %q (which will go missing)", missing.Hex())
err = s.runner.Run(ops, missing, nil)
c.Assert(err, IsNil)

err = s.tc.RemoveId(missing)
c.Assert(err, IsNil)

// Generate a txn-queue on the test document that's large enough
// that it used to cause PurgeMissing to exceed MongoDB's pipeline
// result 16MB size limit (MongoDB 2.4 and older only).
//
// The contents of the txn-queue field doesn't matter, only that
// it's big enough to trigger the size limit. The required size
// can also be achieved by using multiple documents as long as the
// cumulative size of all the txn-queue fields exceeds the
// pipeline limit. A single document is easier to work with for
// this test however.
//
// The txn id of the successful transaction is used fill the
// txn-queue because this takes advantage of a short circuit in
// PurgeMissing, dramatically speeding up the test run time.
const fakeQueueLen = 250000
fakeTxnQueue := make([]string, fakeQueueLen)
token := good.Hex() + "_12345678" // txn id + nonce
for i := 0; i < fakeQueueLen; i++ {
fakeTxnQueue[i] = token
}

err = s.accounts.UpdateId(0, bson.M{
"$set": bson.M{"txn-queue": fakeTxnQueue},
})
c.Assert(err, IsNil)

// PurgeMissing could hit the same pipeline result size limit when
// processing the txn-queue fields of stash documents so insert
// the large txn-queue there too to ensure that no longer happens.
err = s.sc.Insert(
bson.D{{"c", "accounts"}, {"id", 0}},
bson.M{"txn-queue": fakeTxnQueue},
)
c.Assert(err, IsNil)

c.Logf("---- Purging missing transactions")
err = s.runner.PurgeMissing("accounts")
c.Assert(err, IsNil)
}

func (s *S) TestTxnQueueStressTest(c *C) {
txn.SetChaos(txn.Chaos{
SlowdownChance: 0.3,
Expand Down

0 comments on commit 3f8090a

Please sign in to comment.