-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.go
211 lines (188 loc) · 4.57 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package main
import (
"flag"
"fmt"
"io"
"log"
"net"
"runtime"
"sync"
"time"
)
var listenOn = "127.0.0.1:8301"
var proxyTo = "127.0.0.1:8300"
var statsOn = "127.0.0.1:8299"
var concurrency = 1
var wCond = &sync.Cond{L: &sync.Mutex{}}
var waiting = 0
var active = 0
var count uint64
var concurrencyBucket chan struct{}
type client struct {
ID uint64
name string
conn net.Conn
server net.Conn
err error
w sync.WaitGroup
didWait bool
start time.Time
waited time.Time
dialed time.Time
done time.Time
}
func (c *client) copyTo(conn net.Conn) {
io.Copy(conn, c.conn)
c.w.Done()
}
func (c *client) copyFrom(conn net.Conn) {
io.Copy(c.conn, conn)
c.w.Done()
}
func (c *client) copyAll() {
go c.copyTo(c.server)
go c.copyFrom(c.server)
// Wait for both copy operations to complete
c.w.Wait()
// Record when we finished. This way we won't report any of the post
// processing time that we took in the logs
c.done = time.Now()
}
func (c *client) doProxy() {
// Dial out to the real TCP service
c.server, c.err = net.Dial("tcp", proxyTo)
if c.err != nil {
c.logError()
return
}
// If we ever get a connection we always need to close it.
c.dialed = time.Now()
c.copyAll()
c.logSuccess()
}
func (c *client) logError() {
now := time.Now()
log.Printf(
"client=%s num=%d status=error took=%f message=\"%s\"",
c.name,
c.ID,
now.Sub(c.start).Seconds(),
c.err.Error())
}
func (c *client) logSuccess() {
now := time.Now()
waited := 0.0
if c.didWait {
waited = c.waited.Sub(c.start).Seconds()
}
log.Printf(
"client=%s num=%d status=success took=%f wait=%f dial=%f copy=%f",
c.name,
c.ID,
now.Sub(c.start).Seconds(),
waited,
c.dialed.Sub(c.waited).Seconds(),
c.done.Sub(c.dialed).Seconds())
}
func (c *client) setup() {
c.w.Add(2)
// Lock our condition
wCond.L.Lock()
defer wCond.L.Unlock()
// Record that we're now in a wait state
count++
c.ID = count
waiting++
for active == concurrency {
// Wait unlocks the conditions lock when called, and re-locks it upon returning.
// Otherwise the entire program would deadlock here
c.didWait = true
wCond.Wait()
}
c.waited = time.Now()
// Record that we're no longer waiting
waiting--
// Record that we're actively processing the connection now.
active++
}
func (c *client) teardown() {
c.conn.Close()
c.server.Close()
// Lock our condition to avoid races when updating the active variable
wCond.L.Lock()
// Record that we're no longer active
active--
// Unlock our cond
wCond.L.Unlock()
// Send a signal to exactly one goroutine waiting on the cond (unless none are waiting
// then this is effectively a no-op
wCond.Signal()
}
func (c *client) mind() {
c.setup()
c.doProxy()
c.teardown()
}
func handleClient(conn net.Conn) {
c := &client{
name: conn.RemoteAddr().String(),
conn: conn,
start: time.Now(),
}
c.mind()
}
func server() {
// Bind our listening TCP socket
ln, err := net.Listen("tcp", listenOn)
if err != nil {
log.Fatal("net.Listen error: " + err.Error())
}
// Setup our accept loop
for {
conn, err := ln.Accept()
if err != nil {
// I'm not exactly sure what could go wrong here but whatever it is
// is probably bad...
log.Fatal("net.Listener.Accept error: " + err.Error())
}
// Send our connection to be proxied in a new goroutine.
go handleClient(conn)
}
}
func stats() {
// Setup our listener. If we fail to do so we bail out before launching a goroutine.
// to prevent races where the server is listening to clients (real clients) an but
// will fatal unexpectedly while serving them because of this.
ln, err := net.Listen("tcp", statsOn)
if err != nil {
log.Fatal("net.Listen error: " + err.Error())
}
go func(ln net.Listener) {
// Accept clients in a loop
for {
conn, err := ln.Accept()
if err != nil {
log.Fatal("net.Listener.Accept error: " + err.Error())
}
// Launch the handler for the client connection in a goroutine, to get back
// to our loop quickly
go func(c net.Conn) {
// Spit out our stats and close the connection
defer c.Close()
fmt.Fprintf(c, "active: %d, waiting: %d\n", active, waiting)
}(conn)
}
}(ln)
}
func init() {
flag.StringVar(&listenOn, "l", listenOn, "Listen for TCP connections at this address")
flag.StringVar(&proxyTo, "p", proxyTo, "Proxy connected clients to this address")
flag.StringVar(&statsOn, "s", statsOn, "Give stats to clients connecting to this address")
flag.IntVar(&concurrency, "c", concurrency, "Number of active connections allowed to proxy address at a given time")
}
func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
stats()
server()
}