forked from flashmob/go-guerrilla
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
175 lines (153 loc) · 4.31 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package guerrilla
import (
"errors"
"github.com/flashmob/go-guerrilla/log"
"github.com/flashmob/go-guerrilla/mail"
"net"
"sync"
"sync/atomic"
"time"
)
var (
ErrPoolShuttingDown = errors.New("server pool: shutting down")
)
// a struct can be pooled if it has the following interface
type Poolable interface {
// ability to set read/write timeout
setTimeout(t time.Duration)
// set a new connection and client id
init(c net.Conn, clientID uint64, ep *mail.Pool)
// get a unique id
getID() uint64
}
// Pool holds Clients.
type Pool struct {
// clients that are ready to be borrowed
pool chan Poolable
// semaphore to control number of maximum borrowed clients
sem chan bool
// book-keeping of clients that have been lent
activeClients lentClients
isShuttingDownFlg atomic.Value
poolGuard sync.Mutex
ShutdownChan chan int
}
type lentClients struct {
m map[uint64]Poolable
mu sync.Mutex // guards access to this struct
wg sync.WaitGroup
}
// maps the callback on all lentClients
func (c *lentClients) mapAll(callback func(p Poolable)) {
defer c.mu.Unlock()
c.mu.Lock()
for _, item := range c.m {
callback(item)
}
}
// operation performs an operation on a Poolable item using the callback
func (c *lentClients) operation(callback func(p Poolable), item Poolable) {
defer c.mu.Unlock()
c.mu.Lock()
callback(item)
}
// NewPool creates a new pool of Clients.
func NewPool(poolSize int) *Pool {
return &Pool{
pool: make(chan Poolable, poolSize),
sem: make(chan bool, poolSize),
activeClients: lentClients{m: make(map[uint64]Poolable, poolSize)},
ShutdownChan: make(chan int, 1),
}
}
func (p *Pool) Start() {
p.isShuttingDownFlg.Store(true)
}
// Lock the pool from borrowing then remove all active clients
// each active client's timeout is lowered to 1 sec and notified
// to stop accepting commands
func (p *Pool) ShutdownState() {
const aVeryLowTimeout = 1
p.poolGuard.Lock() // ensure no other thread is in the borrowing now
defer p.poolGuard.Unlock()
p.isShuttingDownFlg.Store(true) // no more borrowing
p.ShutdownChan <- 1 // release any waiting p.sem
// set a low timeout
p.activeClients.mapAll(func(p Poolable) {
p.setTimeout(time.Duration(int64(aVeryLowTimeout)))
})
}
func (p *Pool) ShutdownWait() {
p.poolGuard.Lock() // ensure no other thread is in the borrowing now
defer p.poolGuard.Unlock()
p.activeClients.wg.Wait() // wait for clients to finish
if len(p.ShutdownChan) > 0 {
// drain
<-p.ShutdownChan
}
p.isShuttingDownFlg.Store(false)
}
// returns true if the pool is shutting down
func (p *Pool) IsShuttingDown() bool {
if value, ok := p.isShuttingDownFlg.Load().(bool); ok {
return value
}
return false
}
// set a timeout for all lent clients
func (p *Pool) SetTimeout(duration time.Duration) {
p.activeClients.mapAll(func(p Poolable) {
p.setTimeout(duration)
})
}
// Gets the number of active clients that are currently
// out of the pool and busy serving
func (p *Pool) GetActiveClientsCount() int {
return len(p.sem)
}
// Borrow a Client from the pool. Will block if len(activeClients) > maxClients
func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger, ep *mail.Pool) (Poolable, error) {
p.poolGuard.Lock()
defer p.poolGuard.Unlock()
var c Poolable
if yes, really := p.isShuttingDownFlg.Load().(bool); yes && really {
// pool is shutting down.
return c, ErrPoolShuttingDown
}
select {
case p.sem <- true: // block the client from serving until there is room
select {
case c = <-p.pool:
c.init(conn, clientID, ep)
default:
c = NewClient(conn, clientID, logger, ep)
}
p.activeClientsAdd(c)
case <-p.ShutdownChan: // unblock p.sem when shutting down
// pool is shutting down.
return c, ErrPoolShuttingDown
}
return c, nil
}
// Return returns a Client back to the pool.
func (p *Pool) Return(c Poolable) {
p.activeClientsRemove(c)
select {
case p.pool <- c:
default:
// hasta la vista, baby...
}
<-p.sem // make room for the next serving client
}
func (p *Pool) activeClientsAdd(c Poolable) {
p.activeClients.operation(func(item Poolable) {
p.activeClients.wg.Add(1)
p.activeClients.m[c.getID()] = item
}, c)
}
func (p *Pool) activeClientsRemove(c Poolable) {
p.activeClients.operation(func(item Poolable) {
delete(p.activeClients.m, item.getID())
p.activeClients.wg.Done()
}, c)
}