Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
esrrhs committed Oct 26, 2019
1 parent a2f8819 commit 2ffba22
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 39 deletions.
8 changes: 4 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ Usage:
-tcp_bs tcp的发送接收缓冲区大小,默认1MB
Tcp send and receive buffer size, default 1MB
-tcp_mw tcp的最大窗口,默认100
-tcp_mw tcp的最大窗口,默认10000
The maximum window of tcp, the default is 100
-tcp_rst tcp的超时发送时间,默认200ms
Tcp timeout resend time, default 200ms
-tcp_rst tcp的超时发送时间,默认400ms
Tcp timeout resend time, default 400ms
`

func main() {
Expand All @@ -59,7 +59,7 @@ func main() {
tcpmode := flag.Int("tcp", 0, "tcp mode")
tcpmode_buffersize := flag.Int("tcp_bs", 1024*1024, "tcp mode buffer size")
tcpmode_maxwin := flag.Int("tcp_mw", 100, "tcp mode max win")
tcpmode_resend_timems := flag.Int("tcp_rst", 200, "tcp mode resend time ms")
tcpmode_resend_timems := flag.Int("tcp_rst", 400, "tcp mode resend time ms")
flag.Usage = func() {
fmt.Printf(usage)
}
Expand Down
59 changes: 49 additions & 10 deletions framemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type FrameMgr struct {
close bool
remoteclosed bool
closesend bool

lastPingTime int64
rttms int
}

func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
Expand All @@ -40,7 +43,8 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
windowsize: windowsize, resend_timems: resend_timems,
sendwin: list.New(), sendlist: list.New(), sendid: 0,
recvwin: list.New(), recvlist: list.New(), recvid: 0,
close: false, remoteclosed: false, closesend: false}
close: false, remoteclosed: false, closesend: false,
lastPingTime: time.Now().UnixNano(), rttms: resend_timems}

return fm
}
Expand All @@ -66,6 +70,8 @@ func (fm *FrameMgr) Update() {
fm.combineWindowToRecvBuffer()

fm.calSendList()

fm.ping()
}

func (fm *FrameMgr) cutSendBufferToWindow() {
Expand Down Expand Up @@ -166,6 +172,10 @@ func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int3
} else if f.Type == (int32)(Frame_DATA) {
tmpackto[f.Id] = f
loggo.Debug("recv data %d %d", f.Id, len(f.Data))
} else if f.Type == (int32)(Frame_PING) {
fm.processPing(f)
} else if f.Type == (int32)(Frame_PONG) {
fm.processPong(f)
}
}
fm.recvlist.Init()
Expand Down Expand Up @@ -202,17 +212,18 @@ func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int,
Dataid: make([]int32, len(tmpackto))}
index := 0
for id, rf := range tmpackto {
f.Dataid[index] = id
index++
fm.addToRecvWin(rf)
loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data))
if fm.addToRecvWin(rf) {
f.Dataid[index] = id
index++
loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data))
}
}
fm.sendlist.PushBack(f)
loggo.Debug("send ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
}
}

func (fm *FrameMgr) addToRecvWin(rf *Frame) {
func (fm *FrameMgr) addToRecvWin(rf *Frame) bool {

begin := fm.recvid
end := fm.recvid + fm.windowsize
Expand All @@ -222,14 +233,14 @@ func (fm *FrameMgr) addToRecvWin(rf *Frame) {
}
if id > end || id < begin {
loggo.Debug("recv frame not in range %d %d %d", begin, end, id)
return
return false
}

for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == rf.Id {
loggo.Debug("recv frame ignore %d %d", f.Id, len(f.Data))
return
return true
}
}

Expand All @@ -239,12 +250,13 @@ func (fm *FrameMgr) addToRecvWin(rf *Frame) {
if fm.compareId(rf, f) < 0 {
fm.recvwin.InsertBefore(rf, e)
loggo.Debug("insert recv win %d %d before %d", rf.Id, len(rf.Data), f.Id)
return
return true
}
}

fm.recvwin.PushBack(rf)
loggo.Debug("insert recv win last %d %d", rf.Id, len(rf.Data))
return true
}

func (fm *FrameMgr) compareId(lf *Frame, rf *Frame) int {
Expand Down Expand Up @@ -300,7 +312,7 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
id = fm.recvid
for len(reqtmp) < fm.windowsize && e != nil {
f := e.Value.(*Frame)
loggo.Debug("start add req id %d %d", f.Id, id)
loggo.Debug("start add req id %d %d %d", fm.recvid, f.Id, id)
if f.Id != (int32)(id) {
reqtmp[id]++
loggo.Debug("add req id %d ", id)
Expand Down Expand Up @@ -353,3 +365,30 @@ func (fm *FrameMgr) Close() {
func (fm *FrameMgr) IsRemoteClosed() bool {
return fm.remoteclosed
}

func (fm *FrameMgr) ping() {
cur := time.Now().UnixNano()
if cur-fm.lastPingTime > int64(1000*1000) {
f := &Frame{Type: (int32)(Frame_PING), Resend: false, Sendtime: cur,
Id: 0}
fm.sendlist.PushBack(f)
loggo.Debug("send ping %d", cur)
fm.lastPingTime = cur
}
}

func (fm *FrameMgr) processPing(f *Frame) {
rf := &Frame{Type: (int32)(Frame_PONG), Resend: false, Sendtime: f.Sendtime,
Id: 0}
fm.sendlist.PushBack(rf)
loggo.Debug("recv ping %d", f.Sendtime)
}

func (fm *FrameMgr) processPong(f *Frame) {
cur := time.Now().UnixNano()
if cur > f.Sendtime {
rtt := (cur - f.Sendtime) / 1000
fm.rttms = (fm.rttms + (int)(rtt)) / 2
loggo.Debug("recv pong %d %d", rtt, fm.rttms)
}
}
56 changes: 31 additions & 25 deletions msg.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions msg.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ message Frame {
DATA = 0;
REQ = 1;
ACK = 2;
PING = 3;
PONG = 4;
}

int32 type = 1;
Expand Down

0 comments on commit 2ffba22

Please sign in to comment.