forked from buger/goreplay
-
Notifications
You must be signed in to change notification settings - Fork 0
/
input_tcp.go
143 lines (125 loc) · 3.13 KB
/
input_tcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package goreplay
import (
"bufio"
"bytes"
"crypto/tls"
"fmt"
"io"
"log"
"net"
)
// TCPInput used for internal communication
type TCPInput struct {
data chan *Message
listener net.Listener
address string
config *TCPInputConfig
stop chan bool // Channel used only to indicate goroutine should shutdown
}
// TCPInputConfig represents configuration of a TCP input plugin
type TCPInputConfig struct {
Secure bool `json:"input-tcp-secure"`
CertificatePath string `json:"input-tcp-certificate"`
KeyPath string `json:"input-tcp-certificate-key"`
}
// NewTCPInput constructor for TCPInput, accepts address with port
func NewTCPInput(address string, config *TCPInputConfig) (i *TCPInput) {
i = new(TCPInput)
i.data = make(chan *Message, 1000)
i.address = address
i.config = config
i.stop = make(chan bool)
i.listen(address)
return
}
// PluginRead returns data and details read from plugin
func (i *TCPInput) PluginRead() (msg *Message, err error) {
select {
case <-i.stop:
return nil, ErrorStopped
case msg = <-i.data:
return msg, nil
}
}
// Close closes the plugin
func (i *TCPInput) Close() error {
close(i.stop)
i.listener.Close()
return nil
}
func (i *TCPInput) listen(address string) {
if i.config.Secure {
cer, err := tls.LoadX509KeyPair(i.config.CertificatePath, i.config.KeyPath)
if err != nil {
log.Fatalln("error while loading --input-tcp TLS certificate:", err)
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
listener, err := tls.Listen("tcp", address, config)
if err != nil {
log.Fatalln("[INPUT-TCP] failed to start INPUT-TCP listener:", err)
}
i.listener = listener
} else {
listener, err := net.Listen("tcp", address)
if err != nil {
log.Fatalln("failed to start INPUT-TCP listener:", err)
}
i.listener = listener
}
go func() {
for {
conn, err := i.listener.Accept()
if err == nil {
go i.handleConnection(conn)
continue
}
if isTemporaryNetworkError(err) {
continue
}
if operr, ok := err.(*net.OpError); ok && operr.Err.Error() != "use of closed network connection" {
Debug(0, fmt.Sprintf("[INPUT-TCP] listener closed, err: %q", err))
}
break
}
}()
}
var payloadSeparatorAsBytes = []byte(payloadSeparator)
func (i *TCPInput) handleConnection(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
var buffer bytes.Buffer
for {
line, err := reader.ReadBytes('\n')
if err != nil {
if isTemporaryNetworkError(err) {
continue
}
if err != io.EOF {
Debug(0, fmt.Sprintf("[INPUT-TCP] connection error: %q", err))
}
break
}
if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
// unread the '\n' before monkeys
buffer.UnreadByte()
var msg Message
msg.Meta, msg.Data = payloadMetaWithBody(buffer.Bytes())
i.data <- &msg
buffer.Reset()
} else {
buffer.Write(line)
}
}
}
func (i *TCPInput) String() string {
return "TCP input: " + i.address
}
func isTemporaryNetworkError(err error) bool {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
return true
}
if operr, ok := err.(*net.OpError); ok && operr.Temporary() {
return true
}
return false
}