Skip to content

Commit

Permalink
Fix Secondary mode over mongos.
Browse files Browse the repository at this point in the history
Reported by Gabriel Russell.
  • Loading branch information
niemeyer committed Apr 24, 2016
1 parent 8271275 commit 2451555
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 1 deletion.
5 changes: 4 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,10 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout
mastersLen := cluster.masters.Len()
slavesLen := cluster.servers.Len() - mastersLen
debugf("Cluster has %d known masters and %d known slaves.", mastersLen, slavesLen)
if !(slaveOk && mode == Secondary) && mastersLen > 0 || slaveOk && slavesLen > 0 {
if mastersLen > 0 && !(slaveOk && mode == Secondary) || slavesLen > 0 && slaveOk {
break
}
if mastersLen > 0 && mode == Secondary && cluster.masters.HasMongos() {
break
}
if started.IsZero() {
Expand Down
112 changes: 112 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,118 @@ func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) {
c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
}

func (s *S) TestSecondaryModeWithMongos(c *C) {
session, err := mgo.Dial("localhost:40021")
c.Assert(err, IsNil)
defer session.Close()

ssresult := &struct{ Host string }{}
imresult := &struct{ IsMaster bool }{}

// Figure the master while still using the strong session.
err = session.Run("serverStatus", ssresult)
c.Assert(err, IsNil)
err = session.Run("isMaster", imresult)
c.Assert(err, IsNil)
master := ssresult.Host
c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))

// Ensure mongos is aware about the current topology.
s.Stop(":40201")
s.StartAll()

mongos, err := mgo.Dial("localhost:40202")
c.Assert(err, IsNil)
defer mongos.Close()

mongos.SetSyncTimeout(5 * time.Second)

// Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
c.Assert(err, IsNil)

// Wait until all servers see the data.
for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
session, err := mgo.Dial(addr + "?connect=direct")
c.Assert(err, IsNil)
defer session.Close()
session.SetMode(mgo.Monotonic, true)
for i := 300; i >= 0; i-- {
n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
c.Assert(err, IsNil)
if n == 1 {
break
}
if i == 0 {
c.Fatalf("Inserted data never reached " + addr)
}
time.Sleep(100 * time.Millisecond)
}
}

// Collect op counters for everyone.
q21a := s.countQueries(c, "localhost:40021")
q22a := s.countQueries(c, "localhost:40022")
q23a := s.countQueries(c, "localhost:40023")

// Do a Secondary query through MongoS

mongos.SetMode(mgo.Secondary, true)

coll := mongos.DB("mydb").C("mycoll")
var result struct{ N int }
for i := 0; i != 5; i++ {
err = coll.Find(nil).One(&result)
c.Assert(err, IsNil)
c.Assert(result.N, Equals, 1)
}

// Collect op counters for everyone again.
q21b := s.countQueries(c, "localhost:40021")
q22b := s.countQueries(c, "localhost:40022")
q23b := s.countQueries(c, "localhost:40023")

var masterDelta, slaveDelta int
switch hostPort(master) {
case "40021":
masterDelta = q21b - q21a
slaveDelta = (q22b - q22a) + (q23b - q23a)
case "40022":
masterDelta = q22b - q22a
slaveDelta = (q21b - q21a) + (q23b - q23a)
case "40023":
masterDelta = q23b - q23a
slaveDelta = (q21b - q21a) + (q22b - q22a)
default:
c.Fatal("Uh?")
}

c.Check(masterDelta, Equals, 0) // Just the counting itself.
c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
}

func (s *S) TestSecondaryModeWithMongosInsert(c *C) {
if *fast {
c.Skip("-fast")
}

session, err := mgo.Dial("localhost:40202")
c.Assert(err, IsNil)
defer session.Close()

session.SetMode(mgo.Secondary, true)
session.SetSyncTimeout(4 * time.Second)

coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"a": 1})
c.Assert(err, IsNil)

var result struct{ A int }
coll.Find(nil).One(&result)
c.Assert(result.A, Equals, 1)
}


func (s *S) TestRemovalOfClusterMember(c *C) {
if *fast {
c.Skip("-fast")
Expand Down
11 changes: 11 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,15 @@ func (servers *mongoServers) Empty() bool {
return len(servers.slice) == 0
}

func (servers *mongoServers) HasMongos() bool {
for _, s := range servers.slice {
if s.Info().Mongos {
return true
}
}
return false
}

// BestFit returns the best guess of what would be the most interesting
// server to perform operations on at this point in time.
func (servers *mongoServers) BestFit(mode Mode, serverTags []bson.D) *mongoServer {
Expand All @@ -421,6 +430,8 @@ func (servers *mongoServers) BestFit(mode Mode, serverTags []bson.D) *mongoServe
switch {
case serverTags != nil && !next.info.Mongos && !next.hasTags(serverTags):
// Must have requested tags.
case mode == Secondary && next.info.Master && !next.info.Mongos:
// Must be a secondary or mongos.
case next.info.Master != best.info.Master && mode != Nearest:
// Prefer slaves, unless the mode is PrimaryPreferred.
swap = (mode == PrimaryPreferred) != best.info.Master
Expand Down

0 comments on commit 2451555

Please sign in to comment.