Skip to content

Commit

Permalink
lossy channel supports speed limit
Browse files Browse the repository at this point in the history
  • Loading branch information
wxiaoguang committed Feb 8, 2017
1 parent 98585da commit 1dc1262
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
demo/.idea
demo/cmake-build-debug
go/.idea
33 changes: 29 additions & 4 deletions go/kcp/lossychan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
type LossyTrick interface {
DelayMs() int
LossRatio() float64

// limit item per second, or for []byte limit bytes per second
LimitPerSecond() int
}

func LossyChannel(ch chan interface{}, sz int, nt LossyTrick) chan interface{} {
func LossyChannel(name string, ch chan interface{}, sz int, nt LossyTrick) chan interface{} {
q := make([][]interface{}, 5000)
n := 0

Expand All @@ -22,14 +25,17 @@ func LossyChannel(ch chan interface{}, sz int, nt LossyTrick) chan interface{} {

var total int64
var loss int64
var dropSpeed int64
var dropOut int64

go func() {
var tmr *time.Ticker
var speedLimit time.Time

tmr = time.NewTicker(time.Millisecond / 10)

for ch != nil || n != 0 {
select {

case v := <- ch:

if v == nil {
Expand All @@ -44,6 +50,25 @@ func LossyChannel(ch chan interface{}, sz int, nt LossyTrick) chan interface{} {
continue
}

sz := 1
if b, ok := v.([]byte); ok {
sz = len(b)
}

limitPerSec := nt.LimitPerSecond()
if limitPerSec != 0 {
now := time.Now()
if speedLimit.IsZero() {
speedLimit = now
}
if now.Before(speedLimit) {
dropSpeed++
continue
}
speedLimit = speedLimit.Add(time.Duration(sz) * time.Second / time.Duration(limitPerSec))
}


d := nt.DelayMs()
idx := (curIdx + d) % len(q)
q[idx] = append(q[idx], v)
Expand All @@ -64,7 +89,7 @@ func LossyChannel(ch chan interface{}, sz int, nt LossyTrick) chan interface{} {
case <-time.After(time.Millisecond * 5):
r := len(q[idx]) - j
n -= r
loss += int64(r)
dropOut += int64(r)
break loop
}
}
Expand All @@ -75,7 +100,7 @@ func LossyChannel(ch chan interface{}, sz int, nt LossyTrick) chan interface{} {
}
}

fmt.Printf("LossyChannel closed. total=%d, loss=%d\n", total, loss)
fmt.Printf("LossyChannel %s closed. total=%d, loss=%d, dropSpeed=%d, dropOut=%d\n", name, total, loss, dropSpeed, dropOut)
close(out)
}()
return out
Expand Down
15 changes: 10 additions & 5 deletions go/kcp/lossychan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ func (t *testChanTrick) LossRatio() float64 {
return 0.3
}

func (t *testChanTrick) LimitPerSecond() int {
return 100
}

func TestLossyChannel(test *testing.T) {
sz := 32
trick := &testChanTrick{}

chOrig := make(chan interface{}, sz)
chHijacked := LossyChannel(chOrig, sz, trick)
chHijacked := LossyChannel("", chOrig, sz, trick)

recvCount := 0;
wg := sync.WaitGroup{}
Expand All @@ -43,13 +47,14 @@ func TestLossyChannel(test *testing.T) {
wg.Done()
}()

tm := time.Now()
count := 20000
count := 200
for i := 0; i < count; i++ {
chOrig <- time.Now()
fmt.Printf("send = %d us\n", int(time.Now().Sub(tm).Seconds() * 1000000)) //make same cpu usage as the recv routine

delay := time.Second / time.Duration(trick.LimitPerSecond())
time.Sleep(delay / 2) // twice faster, remote should only recv half
}
close(chOrig)
wg.Wait()
fmt.Printf("TestLossyChannel count = %d, recvCount = %d\n", count, recvCount)
}
}
53 changes: 33 additions & 20 deletions go/kcp/lossyconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ type lossyConn struct {

deadlineRead time.Time
deadlineWrite time.Time

die chan struct{}
}

func NewLossyConn(conn net.Conn, channelSize int, readTrick, writeTrick LossyTrick) *lossyConn {
c := &lossyConn{conn: conn}

c.die = make(chan struct{}, 1)
c.chRead = make(chan interface{}, channelSize)
c.chReadTricked = LossyChannel(c.chRead, channelSize, readTrick)
c.chReadTricked = LossyChannel("read", c.chRead, channelSize, readTrick)
c.readBuf = &bytes.Buffer{}

c.chWrite = make(chan interface{}, channelSize)
c.chWriteTricked = LossyChannel(c.chWrite, channelSize, writeTrick)
c.chWriteTricked = LossyChannel("write", c.chWrite, channelSize, writeTrick)

go func() {
for v := range c.chWriteTricked {
Expand All @@ -39,28 +42,31 @@ func NewLossyConn(conn net.Conn, channelSize int, readTrick, writeTrick LossyTri
}()

go func() {
for {
c.startRead()
loop:
for c.chWrite != nil {
b := make([]byte, 4 * 1024)
c.conn.SetReadDeadline(time.Now().Add(900 * time.Millisecond))
n, err := c.conn.Read(b)
if e, ok := err.(net.Error); ok && e.Timeout() {
continue
}

if n == 0 || err != nil{
break loop
}

select {
case c.chRead <- b[:n]:
case <- c.die:
break loop
}
}
close(c.chRead)
}()

return c
}

func (c *lossyConn) startRead() (n int, err error){
b := make([]byte, 2048)
n, err = c.conn.Read(b)
if e, ok := err.(net.Error); ok && e.Timeout() {
return
}

if n == 0 || err != nil {
close(c.chRead)
return 0, io.EOF
}
c.chRead <- b[:n]
return n, err
}

func (c *lossyConn) Read(b []byte) (n int, err error) {
n, _ = c.readBuf.Read(b)
Expand All @@ -81,11 +87,11 @@ func (c *lossyConn) Read(b []byte) (n int, err error) {

select {
case v := <- c.chReadTricked:
p := v.([]byte)
if p == nil {
if v == nil {
return 0, io.EOF
}

p := v.([]byte)
n, _ = c.readBuf.Write(p)
n, _ = c.readBuf.Read(b)
case <-timer.C:
Expand Down Expand Up @@ -129,6 +135,13 @@ func (c *lossyConn) Write(b []byte) (n int, err error) {
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *lossyConn) Close() error {
close(c.die)

if c.chWrite != nil {
close(c.chWrite)
c.chWrite = nil
}

return c.conn.Close()
}

Expand Down
89 changes: 75 additions & 14 deletions go/kcp/lossyconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
"fmt"
"strconv"
"strings"
"math/rand"
"errors"
"sync/atomic"
)

type testConnTrick struct {
delayMs int
lossRatio float64
limitPerSecond int
}

func (t *testConnTrick) DelayMs() int {
Expand All @@ -22,6 +26,10 @@ func (t *testConnTrick) LossRatio() float64 {
return t.lossRatio
}

func (t *testConnTrick) LimitPerSecond() int {
return t.limitPerSecond
}

type testMockLossyConn struct {
countFeed int
countWrite int
Expand All @@ -32,18 +40,36 @@ func testLossyConnNowMs() int {
return int(time.Now().UnixNano() / int64(time.Millisecond))
}

func (c *testMockLossyConn) feed() {
if c.chFeed == nil {
c.chFeed = make(chan []byte, 1024)
}
func (c *testMockLossyConn) init() {
//make the channel large enough
c.chFeed = make(chan []byte, 100000)
}

func (c *testMockLossyConn) feed() bool {
s := fmt.Sprintf("%d\n", testLossyConnNowMs())
c.countFeed++
c.chFeed <- []byte(s)
b := make([]byte, 100 + rand.Intn(1000))
copy(b, []byte(s))
select {
case c.chFeed <- b:
c.countFeed++
return true
default:
return false
}
}

func (c *testMockLossyConn) Read(b []byte) (n int, err error) {
tmp := <- c.chFeed
copy(b, tmp)
return len(tmp), nil
select {
case tmp := <- c.chFeed:
if len(b) < len(tmp) {
panic("not enough buffer")
}
copy(b, tmp)
return len(tmp), nil
case <- time.After(500 * time.Millisecond):
return 0, errTimeout{}
}
return 0, errors.New(errBrokenPipe)
}

func (c *testMockLossyConn) Write(b []byte) (n int, err error) {
Expand Down Expand Up @@ -74,18 +100,16 @@ func (c *testMockLossyConn) SetWriteDeadline(t time.Time) error {
return nil
}

func TestLossyConn(test *testing.T) {
//readTrick := &testConnTrick{delayMs: 20, lossRatio: 0.01}
func TestLossyConnReadWrite(test *testing.T) {
readTrick := &testConnTrick{delayMs: 20, lossRatio: 0.01}
writeTrick := &testConnTrick{delayMs: 50, lossRatio: 0.05}

c := &testMockLossyConn{}
c.init()
nc := NewLossyConn(c, 1024, readTrick, writeTrick)

count := 500
countReadLoss := 0
buf := make([]byte, 4096)

for j := 0; j < readTrick.delayMs; j++ {
if c.countFeed < count {
c.feed()
Expand All @@ -94,10 +118,12 @@ func TestLossyConn(test *testing.T) {
}

for i := 0; i < count; i++ {
nc.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
if c.countFeed < count {
c.feed()
}

buf := make([]byte, 4096)
nc.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, err := nc.Read(buf)
if err != nil {
fmt.Printf("read err=%v\n", err)
Expand All @@ -114,9 +140,44 @@ func TestLossyConn(test *testing.T) {
now := testLossyConnNowMs()
nc.Write([]byte(strconv.Itoa(now)))
}
nc.Close()

<- time.After(1 * time.Second)
fmt.Printf("done, count=%d, countFeed=%d, countReadLoss=%d, countWrite=%d\n", count, c.countFeed, countReadLoss, c.countWrite)
}


func TestLossyConnSpeed(test *testing.T) {
readTrick := &testConnTrick{delayMs: 20, lossRatio: 0.01, limitPerSecond: 50 * 1024}
writeTrick := &testConnTrick{delayMs: 50, lossRatio: 0.05}

c := &testMockLossyConn{}
c.init()

nc := NewLossyConn(c, 1024, readTrick, writeTrick)

go func() {
for {
c.feed()
}
}()

totalSize := int64(0)
go func() {
buf := make([]byte, 4096)
for {
n, _ := nc.Read(buf)
atomic.AddInt64(&totalSize, int64(n))
}
}()

for i := 0; i < 3; i++ {
<- time.After(1 * time.Second)
sz := atomic.SwapInt64(&totalSize, 0)
fmt.Printf("speed=%.2f KB/s\n", float64(sz) / 1024.0)
}

nc.Close()
<- time.After(1 * time.Second)

}

0 comments on commit 1dc1262

Please sign in to comment.