forked from ethereum/go-ethereum
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtracker.go
205 lines (179 loc) · 6.86 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package tracker
import (
"container/list"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const (
// trackedGaugeName is the prefix of the per-packet request tracking.
trackedGaugeName = "p2p/tracked"
// lostMeterName is the prefix of the per-packet request expirations.
lostMeterName = "p2p/lost"
// staleMeterName is the prefix of the per-packet stale responses.
staleMeterName = "p2p/stale"
// waitHistName is the prefix of the per-packet (req only) waiting time histograms.
waitHistName = "p2p/wait"
// maxTrackedPackets is a huge number to act as a failsafe on the number of
// pending requests the node will track. It should never be hit unless an
// attacker figures out a way to spin requests.
maxTrackedPackets = 100000
)
// request tracks sent network requests which have not yet received a response.
type request struct {
peer string
version uint // Protocol version
reqCode uint64 // Protocol message code of the request
resCode uint64 // Protocol message code of the expected response
time time.Time // Timestamp when the request was made
expire *list.Element // Expiration marker to untrack it
}
// Tracker is a pending network request tracker to measure how much time it takes
// a remote peer to respond.
type Tracker struct {
protocol string // Protocol capability identifier for the metrics
timeout time.Duration // Global timeout after which to drop a tracked packet
pending map[uint64]*request // Currently pending requests
expire *list.List // Linked list tracking the expiration order
wake *time.Timer // Timer tracking the expiration of the next item
lock sync.Mutex // Lock protecting from concurrent updates
}
// New creates a new network request tracker to monitor how much time it takes to
// fill certain requests and how individual peers perform.
func New(protocol string, timeout time.Duration) *Tracker {
return &Tracker{
protocol: protocol,
timeout: timeout,
pending: make(map[uint64]*request),
expire: list.New(),
}
}
// Track adds a network request to the tracker to wait for a response to arrive
// or until the request it cancelled or times out.
func (t *Tracker) Track(peer string, version uint, reqCode uint64, resCode uint64, id uint64) {
if !metrics.Enabled {
return
}
t.lock.Lock()
defer t.lock.Unlock()
// If there's a duplicate request, we've just random-collided (or more probably,
// we have a bug), report it. We could also add a metric, but we're not really
// expecting ourselves to be buggy, so a noisy warning should be enough.
if _, ok := t.pending[id]; ok {
log.Error("Network request id collision", "protocol", t.protocol, "version", version, "code", reqCode, "id", id)
return
}
// If we have too many pending requests, bail out instead of leaking memory
if pending := len(t.pending); pending >= maxTrackedPackets {
log.Error("Request tracker exceeded allowance", "pending", pending, "peer", peer, "protocol", t.protocol, "version", version, "code", reqCode)
return
}
// Id doesn't exist yet, start tracking it
t.pending[id] = &request{
peer: peer,
version: version,
reqCode: reqCode,
resCode: resCode,
time: time.Now(),
expire: t.expire.PushBack(id),
}
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, version, reqCode)
metrics.GetOrRegisterGauge(g, nil).Inc(1)
// If we've just inserted the first item, start the expiration timer
if t.wake == nil {
t.wake = time.AfterFunc(t.timeout, t.clean)
}
}
// clean is called automatically when a preset time passes without a response
// being delivered for the first network request.
func (t *Tracker) clean() {
t.lock.Lock()
defer t.lock.Unlock()
// Expire anything within a certain threshold (might be no items at all if
// we raced with the delivery)
for t.expire.Len() > 0 {
// Stop iterating if the next pending request is still alive
var (
head = t.expire.Front()
id = head.Value.(uint64)
req = t.pending[id]
)
if time.Since(req.time) < t.timeout+5*time.Millisecond {
break
}
// Nope, dead, drop it
t.expire.Remove(head)
delete(t.pending, id)
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterGauge(g, nil).Dec(1)
m := fmt.Sprintf("%s/%s/%d/%#02x", lostMeterName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterMeter(m, nil).Mark(1)
}
t.schedule()
}
// schedule starts a timer to trigger on the expiration of the first network
// packet.
func (t *Tracker) schedule() {
if t.expire.Len() == 0 {
t.wake = nil
return
}
t.wake = time.AfterFunc(time.Until(t.pending[t.expire.Front().Value.(uint64)].time.Add(t.timeout)), t.clean)
}
// Fulfil fills a pending request, if any is available, reporting on various metrics.
func (t *Tracker) Fulfil(peer string, version uint, code uint64, id uint64) {
if !metrics.Enabled {
return
}
t.lock.Lock()
defer t.lock.Unlock()
// If it's a non existing request, track as stale response
req, ok := t.pending[id]
if !ok {
m := fmt.Sprintf("%s/%s/%d/%#02x", staleMeterName, t.protocol, version, code)
metrics.GetOrRegisterMeter(m, nil).Mark(1)
return
}
// If the response is funky, it might be some active attack
if req.peer != peer || req.version != version || req.resCode != code {
log.Warn("Network response id collision",
"have", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, version, code),
"want", fmt.Sprintf("%s:%s/%d:%d", peer, t.protocol, req.version, req.resCode),
)
return
}
// Everything matches, mark the request serviced and meter it
t.expire.Remove(req.expire)
delete(t.pending, id)
if req.expire.Prev() == nil {
if t.wake.Stop() {
t.schedule()
}
}
g := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.protocol, req.version, req.reqCode)
metrics.GetOrRegisterGauge(g, nil).Dec(1)
h := fmt.Sprintf("%s/%s/%d/%#02x", waitHistName, t.protocol, req.version, req.reqCode)
sampler := func() metrics.Sample {
return metrics.ResettingSample(
metrics.NewExpDecaySample(1028, 0.015),
)
}
metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(req.time).Microseconds())
}