Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed May 8, 2019
1 parent c22776b commit c340cca
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 18 deletions.
1 change: 1 addition & 0 deletions readloop_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (l *Listener) monitor() {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
}
} else {
l.Close()
return
}
}
Expand Down
1 change: 1 addition & 0 deletions readloop_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (l *Listener) monitor() {
}
}
} else {
l.Close()
return
}
}
Expand Down
22 changes: 14 additions & 8 deletions sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (s *UDPSession) uncork() {
if s.l != nil {
select {
case s.l.chTxQueue <- txqueue:
case <-s.die:
case <-s.l.die:
}
} else {
select {
Expand Down Expand Up @@ -700,10 +700,13 @@ type (
chAccepts chan *UDPSession // Listen() backlog
chSessionClosed chan net.Addr // session close queue
headerSize int // the additional header to a KCP frame
die chan struct{} // notify the listener has closed
rd atomic.Value // read deadline for Accept()
wd atomic.Value
chTxQueue chan []ipv4.Message

die chan struct{} // notify the listener has closed
dieOnce sync.Once

rd atomic.Value // read deadline for Accept()
wd atomic.Value
chTxQueue chan []ipv4.Message
}
)

Expand Down Expand Up @@ -830,9 +833,12 @@ func (l *Listener) SetWriteDeadline(t time.Time) error {
}

// Close stops listening on the UDP address. Already Accepted connections are not closed.
func (l *Listener) Close() error {
close(l.die)
return l.conn.Close()
func (l *Listener) Close() (err error) {
l.dieOnce.Do(func() {
close(l.die)
err = l.conn.Close()
})
return
}

// closeSession notify the listener that a session has closed
Expand Down
42 changes: 34 additions & 8 deletions sess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ var pass = pbkdf2.Key(key, []byte(portSink), 4096, 32, sha1.New)

func init() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
}()

go echoServer()
go sinkServer()
go tinyBufferEchoServer()
println("beginning tests, encryption:salsa20, fec:10/3")
log.Println("beginning tests, encryption:salsa20, fec:10/3")
}

func dialEcho() (*UDPSession, error) {
Expand Down Expand Up @@ -115,7 +112,7 @@ func listenSink() (net.Listener, error) {
return ListenWithOptions(portSink, nil, 0, 0)
}

func echoServer() {
func echoServer() net.Listener {
l, err := listenEcho()
if err != nil {
panic(err)
Expand All @@ -138,9 +135,11 @@ func echoServer() {
go handleEcho(s.(*UDPSession))
}
}()

return l
}

func sinkServer() {
func sinkServer() net.Listener {
l, err := listenSink()
if err != nil {
panic(err)
Expand All @@ -160,9 +159,11 @@ func sinkServer() {
go handleSink(s.(*UDPSession))
}
}()

return l
}

func tinyBufferEchoServer() {
func tinyBufferEchoServer() net.Listener {
l, err := listenTinyBufferEcho()
if err != nil {
panic(err)
Expand All @@ -177,6 +178,7 @@ func tinyBufferEchoServer() {
go handleTinyBufferEcho(s.(*UDPSession))
}
}()
return l
}

///////////////////////////
Expand Down Expand Up @@ -233,6 +235,9 @@ func handleTinyBufferEcho(conn *UDPSession) {
///////////////////////////

func TestTimeout(t *testing.T) {
l := echoServer()
defer l.Close()

cli, err := dialEcho()
if err != nil {
panic(err)
Expand All @@ -250,6 +255,9 @@ func TestTimeout(t *testing.T) {
}

func TestSendRecv(t *testing.T) {
l := echoServer()
defer l.Close()

cli, err := dialEcho()
if err != nil {
panic(err)
Expand All @@ -273,6 +281,9 @@ func TestSendRecv(t *testing.T) {
}

func TestSendVector(t *testing.T) {
l := echoServer()
defer l.Close()

cli, err := dialEcho()
if err != nil {
panic(err)
Expand All @@ -298,6 +309,9 @@ func TestSendVector(t *testing.T) {
}

func TestTinyBufferReceiver(t *testing.T) {
l := tinyBufferEchoServer()
defer l.Close()

cli, err := dialTinyBufferEcho()
if err != nil {
panic(err)
Expand Down Expand Up @@ -338,6 +352,9 @@ func TestTinyBufferReceiver(t *testing.T) {
}

func TestClose(t *testing.T) {
l := echoServer()
defer l.Close()

cli, err := dialEcho()
if err != nil {
panic(err)
Expand All @@ -360,6 +377,9 @@ func TestClose(t *testing.T) {
}

func TestParallel1024CLIENT_64BMSG_64CNT(t *testing.T) {
l := echoServer()
defer l.Close()

var wg sync.WaitGroup
wg.Add(1024)
for i := 0; i < 1024; i++ {
Expand Down Expand Up @@ -396,6 +416,9 @@ func BenchmarkEchoSpeed1M(b *testing.B) {
}

func speedclient(b *testing.B, nbytes int) {
l := echoServer()
defer l.Close()

b.ReportAllocs()
cli, err := dialEcho()
if err != nil {
Expand Down Expand Up @@ -425,6 +448,9 @@ func BenchmarkSinkSpeed1M(b *testing.B) {
}

func sinkclient(b *testing.B, nbytes int) {
l := sinkServer()
defer l.Close()

b.ReportAllocs()
cli, err := dialSink()
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion tx_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ func (l *Listener) txLoop() {
for k := range txqueue {
if n, err := l.conn.WriteTo(txqueue[k].Buffers[0], txqueue[k].Addr); err == nil {
nbytes += n
xmitBuf.Put(txqueue[k].Buffers[0])
} else {
l.Close()
return
}
xmitBuf.Put(txqueue[k].Buffers[0])
}
atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(len(txqueue)))
atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
Expand Down
3 changes: 2 additions & 1 deletion tx_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (l *Listener) txLoop() {
if n, err := conn.WriteBatch(vec, 0); err == nil {
vec = vec[n:]
} else {
break
l.Close()
return
}
}

Expand Down

0 comments on commit c340cca

Please sign in to comment.