From 114d7e7b1789ac06deb2ba16a96dff5f34f4c7b0 Mon Sep 17 00:00:00 2001 From: Gustavo Niemeyer Date: Mon, 21 Jul 2014 11:28:12 -0300 Subject: [PATCH] Expose pool limit setting via Session.SetPoolLimit. --- cluster.go | 13 ++++++------ cluster_test.go | 55 +++++++++++++++++++++++++++++++++++++++++-------- export_test.go | 9 -------- server.go | 13 ++++++------ session.go | 25 ++++++++++++++++++++-- 5 files changed, 82 insertions(+), 33 deletions(-) diff --git a/cluster.go b/cluster.go index 85cc76af1..3891bbfee 100644 --- a/cluster.go +++ b/cluster.go @@ -515,12 +515,10 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) { cluster.Unlock() } -var socketsPerServer = 4096 - // AcquireSocket returns a socket to a server in the cluster. If slaveOk is // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. -func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D) (s *mongoSocket, err error) { +func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) { var started time.Time var syncCount uint warnedLimit := false @@ -562,12 +560,13 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati continue } - s, abended, err := server.AcquireSocket(socketsPerServer, socketTimeout) - if err == errSocketLimit { + s, abended, err := server.AcquireSocket(poolLimit, socketTimeout) + if err == errPoolLimit { if !warnedLimit { + warnedLimit = true log("WARNING: Per-server connection limit reached.") } - time.Sleep(1e8) + time.Sleep(100 * time.Millisecond) continue } if err != nil { @@ -582,7 +581,7 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati logf("Cannot confirm server %s as master (%v)", server.Addr, err) s.Release() cluster.syncServers() - time.Sleep(1e8) + time.Sleep(100 * time.Millisecond) continue } } diff --git a/cluster_test.go b/cluster_test.go index 54f44908e..69519a2d9 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1151,13 +1151,48 @@ func (s *S) TestRemovalOfClusterMember(c *C) { c.Log("========== Test succeeded. ==========") } -func (s *S) TestSocketLimit(c *C) { +func (s *S) TestPoolLimitSimple(c *C) { + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + stats := mgo.GetStats() + for stats.MasterConns+stats.SlaveConns != 1 { + stats = mgo.GetStats() + c.Log("Waiting for connection to be established...") + time.Sleep(100 * time.Millisecond) + } + + c.Assert(stats.SocketsAlive, Equals, 1) + c.Assert(stats.SocketsInUse, Equals, 0) + + // Put one socket in use. + c.Assert(session.Ping(), IsNil) + + done := make(chan time.Duration) + + // Now block trying to get another one due to the pool limit. + go func() { + copy := session.Copy() + defer copy.Close() + copy.SetPoolLimit(1) + started := time.Now() + c.Check(copy.Ping(), IsNil) + done <- time.Now().Sub(started) + }() + + time.Sleep(500 * time.Millisecond) + + // Put the one socket back in the pool, freeing it for the copy. + session.Refresh() + delay := <-done + c.Assert(delay > 500 * time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) +} + +func (s *S) TestPoolLimitMany(c *C) { if *fast { c.Skip("-fast") } - const socketLimit = 64 - restore := mgo.HackSocketsPerServer(socketLimit) - defer restore() session, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) @@ -1167,17 +1202,19 @@ func (s *S) TestSocketLimit(c *C) { for stats.MasterConns+stats.SlaveConns != 3 { stats = mgo.GetStats() c.Log("Waiting for all connections to be established...") - time.Sleep(5e8) + time.Sleep(500 * time.Millisecond) } c.Assert(stats.SocketsAlive, Equals, 3) + const poolLimit = 64 + session.SetPoolLimit(poolLimit) + // Consume the whole limit for the master. var master []*mgo.Session - for i := 0; i < socketLimit; i++ { + for i := 0; i < poolLimit; i++ { s := session.Copy() defer s.Close() - err := s.Ping() - c.Assert(err, IsNil) + c.Assert(s.Ping(), IsNil) master = append(master, s) } @@ -1187,7 +1224,7 @@ func (s *S) TestSocketLimit(c *C) { master[0].Refresh() }() - // Now a single ping must block, since it would need another + // Then, a single ping must block, since it would need another // connection to the master, over the limit. Once the goroutine // above releases its socket, it should move on. session.Ping() diff --git a/export_test.go b/export_test.go index b6bfcbc73..690f84d38 100644 --- a/export_test.go +++ b/export_test.go @@ -4,15 +4,6 @@ import ( "time" ) -func HackSocketsPerServer(newLimit int) (restore func()) { - oldLimit := newLimit - restore = func() { - socketsPerServer = oldLimit - } - socketsPerServer = newLimit - return -} - func HackPingDelay(newDelay time.Duration) (restore func()) { globalMutex.Lock() defer globalMutex.Unlock() diff --git a/server.go b/server.go index cc880e223..c86d22642 100644 --- a/server.go +++ b/server.go @@ -89,7 +89,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) * return server } -var errSocketLimit = errors.New("per-server connection limit reached") +var errPoolLimit = errors.New("per-server connection limit reached") var errServerClosed = errors.New("server was closed") // AcquireSocket returns a socket for communicating with the server. @@ -97,9 +97,10 @@ var errServerClosed = errors.New("server was closed") // it will establish a new one. The returned socket is owned by the call site, // and will return to the cache when the socket has its Release method called // the same number of times as AcquireSocket + Acquire were called for it. -// If the limit argument is not zero, a socket will only be returned if the -// number of sockets in use for this server is under the provided limit. -func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { +// If the poolLimit argument is greater than zero and the number of sockets in +// use in this server is greater than the provided limit, errPoolLimit is +// returned. +func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { for { server.Lock() abended = server.abended @@ -108,9 +109,9 @@ func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (sock return nil, abended, errServerClosed } n := len(server.unusedSockets) - if limit > 0 && len(server.liveSockets)-n >= limit { + if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit { server.Unlock() - return nil, false, errSocketLimit + return nil, false, errPoolLimit } if n > 0 { socket = server.unusedSockets[n-1] diff --git a/session.go b/session.go index e2a8a725d..50f367e7c 100644 --- a/session.go +++ b/session.go @@ -70,6 +70,7 @@ type Session struct { sourcedb string dialCred *Credential creds []Credential + poolLimit int } type Database struct { @@ -431,7 +432,12 @@ func parseURL(s string) (*urlInfo, error) { func newSession(consistency mode, cluster *mongoCluster, timeout time.Duration) (session *Session) { cluster.Acquire() - session = &Session{cluster_: cluster, syncTimeout: timeout, sockTimeout: timeout} + session = &Session{ + cluster_: cluster, + syncTimeout: timeout, + sockTimeout: timeout, + poolLimit: 4096, + } debugf("New session %p on cluster %p", session, cluster) session.SetMode(consistency, true) session.SetSafe(&Safe{}) @@ -1368,6 +1374,21 @@ func (s *Session) SetCursorTimeout(d time.Duration) { s.m.Unlock() } +// SetPoolLimit sets the maximum number of sockets in use in a single server +// before this session will block waiting for a socket to be available. +// The default limit is 4096. +// +// This limit must be set to cover more than any expected workload of the +// application. It is a bad practice and an unsupported use case to use the +// database driver to define the concurrency limit of an application. Prevent +// such concurrency "at the door" instead, by properly restricting the amount +// of used resources and number of goroutines before they are created. +func (s *Session) SetPoolLimit(limit int) { + s.m.Lock() + s.poolLimit = limit + s.m.Unlock() +} + // SetBatch sets the default batch size used when fetching documents from the // database. It's possible to change this setting on a per-query basis as // well, using the Query.Batch method. @@ -3365,7 +3386,7 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { } // Still not good. We need a new socket. - sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags) + sock, err := s.cluster().AcquireSocket(slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) if err != nil { return nil, err }