Skip to content

Commit

Permalink
Expose pool limit setting via Session.SetPoolLimit.
Browse files Browse the repository at this point in the history
  • Loading branch information
niemeyer committed Jul 21, 2014
1 parent e15707a commit 114d7e7
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 33 deletions.
13 changes: 6 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,10 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
cluster.Unlock()
}

var socketsPerServer = 4096

// AcquireSocket returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D) (s *mongoSocket, err error) {
func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
var started time.Time
var syncCount uint
warnedLimit := false
Expand Down Expand Up @@ -562,12 +560,13 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati
continue
}

s, abended, err := server.AcquireSocket(socketsPerServer, socketTimeout)
if err == errSocketLimit {
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
if err == errPoolLimit {
if !warnedLimit {
warnedLimit = true
log("WARNING: Per-server connection limit reached.")
}
time.Sleep(1e8)
time.Sleep(100 * time.Millisecond)
continue
}
if err != nil {
Expand All @@ -582,7 +581,7 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati
logf("Cannot confirm server %s as master (%v)", server.Addr, err)
s.Release()
cluster.syncServers()
time.Sleep(1e8)
time.Sleep(100 * time.Millisecond)
continue
}
}
Expand Down
55 changes: 46 additions & 9 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,13 +1151,48 @@ func (s *S) TestRemovalOfClusterMember(c *C) {
c.Log("========== Test succeeded. ==========")
}

func (s *S) TestSocketLimit(c *C) {
func (s *S) TestPoolLimitSimple(c *C) {
session, err := mgo.Dial("localhost:40001")
c.Assert(err, IsNil)
defer session.Close()

stats := mgo.GetStats()
for stats.MasterConns+stats.SlaveConns != 1 {
stats = mgo.GetStats()
c.Log("Waiting for connection to be established...")
time.Sleep(100 * time.Millisecond)
}

c.Assert(stats.SocketsAlive, Equals, 1)
c.Assert(stats.SocketsInUse, Equals, 0)

// Put one socket in use.
c.Assert(session.Ping(), IsNil)

done := make(chan time.Duration)

// Now block trying to get another one due to the pool limit.
go func() {
copy := session.Copy()
defer copy.Close()
copy.SetPoolLimit(1)
started := time.Now()
c.Check(copy.Ping(), IsNil)
done <- time.Now().Sub(started)
}()

time.Sleep(500 * time.Millisecond)

// Put the one socket back in the pool, freeing it for the copy.
session.Refresh()
delay := <-done
c.Assert(delay > 500 * time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
}

func (s *S) TestPoolLimitMany(c *C) {
if *fast {
c.Skip("-fast")
}
const socketLimit = 64
restore := mgo.HackSocketsPerServer(socketLimit)
defer restore()

session, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
Expand All @@ -1167,17 +1202,19 @@ func (s *S) TestSocketLimit(c *C) {
for stats.MasterConns+stats.SlaveConns != 3 {
stats = mgo.GetStats()
c.Log("Waiting for all connections to be established...")
time.Sleep(5e8)
time.Sleep(500 * time.Millisecond)
}
c.Assert(stats.SocketsAlive, Equals, 3)

const poolLimit = 64
session.SetPoolLimit(poolLimit)

// Consume the whole limit for the master.
var master []*mgo.Session
for i := 0; i < socketLimit; i++ {
for i := 0; i < poolLimit; i++ {
s := session.Copy()
defer s.Close()
err := s.Ping()
c.Assert(err, IsNil)
c.Assert(s.Ping(), IsNil)
master = append(master, s)
}

Expand All @@ -1187,7 +1224,7 @@ func (s *S) TestSocketLimit(c *C) {
master[0].Refresh()
}()

// Now a single ping must block, since it would need another
// Then, a single ping must block, since it would need another
// connection to the master, over the limit. Once the goroutine
// above releases its socket, it should move on.
session.Ping()
Expand Down
9 changes: 0 additions & 9 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@ import (
"time"
)

func HackSocketsPerServer(newLimit int) (restore func()) {
oldLimit := newLimit
restore = func() {
socketsPerServer = oldLimit
}
socketsPerServer = newLimit
return
}

func HackPingDelay(newDelay time.Duration) (restore func()) {
globalMutex.Lock()
defer globalMutex.Unlock()
Expand Down
13 changes: 7 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,18 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *
return server
}

var errSocketLimit = errors.New("per-server connection limit reached")
var errPoolLimit = errors.New("per-server connection limit reached")
var errServerClosed = errors.New("server was closed")

// AcquireSocket returns a socket for communicating with the server.
// This will attempt to reuse an old connection, if one is available. Otherwise,
// it will establish a new one. The returned socket is owned by the call site,
// and will return to the cache when the socket has its Release method called
// the same number of times as AcquireSocket + Acquire were called for it.
// If the limit argument is not zero, a socket will only be returned if the
// number of sockets in use for this server is under the provided limit.
func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
// If the poolLimit argument is greater than zero and the number of sockets in
// use in this server is greater than the provided limit, errPoolLimit is
// returned.
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
Expand All @@ -108,9 +109,9 @@ func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (sock
return nil, abended, errServerClosed
}
n := len(server.unusedSockets)
if limit > 0 && len(server.liveSockets)-n >= limit {
if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit {
server.Unlock()
return nil, false, errSocketLimit
return nil, false, errPoolLimit
}
if n > 0 {
socket = server.unusedSockets[n-1]
Expand Down
25 changes: 23 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Session struct {
sourcedb string
dialCred *Credential
creds []Credential
poolLimit int
}

type Database struct {
Expand Down Expand Up @@ -431,7 +432,12 @@ func parseURL(s string) (*urlInfo, error) {

func newSession(consistency mode, cluster *mongoCluster, timeout time.Duration) (session *Session) {
cluster.Acquire()
session = &Session{cluster_: cluster, syncTimeout: timeout, sockTimeout: timeout}
session = &Session{
cluster_: cluster,
syncTimeout: timeout,
sockTimeout: timeout,
poolLimit: 4096,
}
debugf("New session %p on cluster %p", session, cluster)
session.SetMode(consistency, true)
session.SetSafe(&Safe{})
Expand Down Expand Up @@ -1368,6 +1374,21 @@ func (s *Session) SetCursorTimeout(d time.Duration) {
s.m.Unlock()
}

// SetPoolLimit sets the maximum number of sockets in use in a single server
// before this session will block waiting for a socket to be available.
// The default limit is 4096.
//
// This limit must be set to cover more than any expected workload of the
// application. It is a bad practice and an unsupported use case to use the
// database driver to define the concurrency limit of an application. Prevent
// such concurrency "at the door" instead, by properly restricting the amount
// of used resources and number of goroutines before they are created.
func (s *Session) SetPoolLimit(limit int) {
s.m.Lock()
s.poolLimit = limit
s.m.Unlock()
}

// SetBatch sets the default batch size used when fetching documents from the
// database. It's possible to change this setting on a per-query basis as
// well, using the Query.Batch method.
Expand Down Expand Up @@ -3365,7 +3386,7 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) {
}

// Still not good. We need a new socket.
sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags)
sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 114d7e7

Please sign in to comment.