Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make batchConn more generic #200

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions batchconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ type batchConn interface {
WriteBatch(ms []ipv4.Message, flags int) (int, error)
ReadBatch(ms []ipv4.Message, flags int) (int, error)
}

type batchErrDetector interface {
ReadBatchUnavailable(err error) bool
WriteBatchUnavailable(err error) bool
}
30 changes: 30 additions & 0 deletions batchconn_generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// +build !linux

package kcp

import (
"net"
)

func toBatchConn(c net.PacketConn) batchConn {
if xconn, ok := c.(batchConn); ok {
return xconn
}
return nil
}

func readBatchUnavailable(xconn batchConn, err error) bool {
ret := false
if detector, ok := xconn.(batchErrDetector); ok {
ret = detector.ReadBatchUnavailable(err)
}
return ret
}

func writeBatchUnavailable(xconn batchConn, err error) bool {
ret := false
if detector, ok := xconn.(batchErrDetector); ok {
ret = detector.WriteBatchUnavailable(err)
}
return ret
}
81 changes: 81 additions & 0 deletions batchconn_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// +build linux

package kcp

import (
"net"
"os"

"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)

func toBatchConn(c net.PacketConn) batchConn {
if xconn, ok := c.(batchConn); ok {
return xconn
}
var xconn batchConn
if _, ok := c.(*net.UDPConn); ok {
addr, err := net.ResolveUDPAddr("udp", c.LocalAddr().String())
if err == nil {
if addr.IP.To4() != nil {
xconn = ipv4.NewPacketConn(c)
} else {
xconn = ipv6.NewPacketConn(c)
}
}
}
return xconn
}

func isPacketConn(xconn batchConn) bool {
if _, ok := xconn.(*ipv4.PacketConn); ok {
return true
}
if _, ok := xconn.(*ipv6.PacketConn); ok {
return true
}
return false
}

func readBatchUnavailable(xconn batchConn, err error) bool {
if isPacketConn(xconn) {
// compatibility issue:
// for linux kernel<=2.6.32, support for sendmmsg is not available
// an error of type os.SyscallError will be returned
if operr, ok := err.(*net.OpError); ok {
if se, ok := operr.Err.(*os.SyscallError); ok {
if se.Syscall == "recvmmsg" {
return true
}
}
}
return false
}
ret := false
if detector, ok := xconn.(batchErrDetector); ok {
ret = detector.ReadBatchUnavailable(err)
}
return ret
}

func writeBatchUnavailable(xconn batchConn, err error) bool {
if isPacketConn(xconn) {
// compatibility issue:
// for linux kernel<=2.6.32, support for sendmmsg is not available
// an error of type os.SyscallError will be returned
if operr, ok := err.(*net.OpError); ok {
if se, ok := operr.Err.(*os.SyscallError); ok {
if se.Syscall == "sendmmsg" {
return true
}
}
}
return false
}
ret := false
if detector, ok := xconn.(batchErrDetector); ok {
ret = detector.WriteBatchUnavailable(err)
}
return ret
}
81 changes: 81 additions & 0 deletions readloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,29 @@ import (
"sync/atomic"

"github.com/pkg/errors"
"golang.org/x/net/ipv4"
)

func (s *UDPSession) readLoop() {
// default version
if s.xconn == nil {
s.defaultReadLoop()
return
}
s.batchReadLoop()
}

func (l *Listener) monitor() {
xconn := toBatchConn(l.conn)

// default version
if xconn == nil {
l.defaultMonitor()
return
}
l.batchMonitor(xconn)
}

func (s *UDPSession) defaultReadLoop() {
buf := make([]byte, mtuLimit)
var src string
Expand Down Expand Up @@ -37,3 +58,63 @@ func (l *Listener) defaultMonitor() {
}
}
}

func (s *UDPSession) batchReadLoop() {
// x/net version
var src string
msgs := make([]ipv4.Message, batchSize)
for k := range msgs {
msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)}
}

for {
if count, err := s.xconn.ReadBatch(msgs, 0); err == nil {
for i := 0; i < count; i++ {
msg := &msgs[i]
// make sure the packet is from the same source
if msg.Addr.String() != src {
if len(src) == 0 { // set source address if nil
src = msg.Addr.String()
} else {
atomic.AddUint64(&DefaultSnmp.InErrs, 1)
continue
}
}

// source and size has validated
s.packetInput(msg.Buffers[0][:msg.N])
}
} else {
if readBatchUnavailable(s.xconn, err) {
s.defaultReadLoop()
return
}
s.notifyReadError(errors.WithStack(err))
return
}
}
}

func (l *Listener) batchMonitor(xconn batchConn) {
// x/net version
msgs := make([]ipv4.Message, batchSize)
for k := range msgs {
msgs[k].Buffers = [][]byte{make([]byte, mtuLimit)}
}

for {
if count, err := xconn.ReadBatch(msgs, 0); err == nil {
for i := 0; i < count; i++ {
msg := &msgs[i]
l.packetInput(msg.Buffers[0][:msg.N], msg.Addr)
}
} else {
if readBatchUnavailable(xconn, err) {
l.defaultMonitor()
return
}
l.notifyReadError(errors.WithStack(err))
return
}
}
}
11 changes: 0 additions & 11 deletions readloop_generic.go

This file was deleted.

111 changes: 0 additions & 111 deletions readloop_linux.go

This file was deleted.

13 changes: 2 additions & 11 deletions sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,8 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
sess.block = block
sess.recvbuf = make([]byte, mtuLimit)

// cast to writebatch conn
if _, ok := conn.(*net.UDPConn); ok {
addr, err := net.ResolveUDPAddr("udp", conn.LocalAddr().String())
if err == nil {
if addr.IP.To4() != nil {
sess.xconn = ipv4.NewPacketConn(conn)
} else {
sess.xconn = ipv6.NewPacketConn(conn)
}
}
}
// cast to batchConn, can be nil
sess.xconn = toBatchConn(conn)

// FEC codec initialization
sess.fecDecoder = newFECDecoder(dataShards, parityShards)
Expand Down
Loading