Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
esrrhs committed Oct 28, 2019
1 parent 1f8263a commit 2b6ecc4
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 51 deletions.
17 changes: 10 additions & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const (
)

func NewClient(addr string, server string, target string, timeout int, key int,
tcpmode int, tcpmode_buffersize int, tcpmode_maxwin int, tcpmode_resend_timems int, tcpmode_compress int) (*Client, error) {
tcpmode int, tcpmode_buffersize int, tcpmode_maxwin int, tcpmode_resend_timems int, tcpmode_compress int,
tcpmode_stat int) (*Client, error) {

var ipaddr *net.UDPAddr
var tcpaddr *net.TCPAddr
Expand Down Expand Up @@ -56,6 +57,7 @@ func NewClient(addr string, server string, target string, timeout int, key int,
tcpmode_maxwin: tcpmode_maxwin,
tcpmode_resend_timems: tcpmode_resend_timems,
tcpmode_compress: tcpmode_compress,
tcpmode_stat: tcpmode_stat,
}, nil
}

Expand All @@ -72,6 +74,7 @@ type Client struct {
tcpmode_maxwin int
tcpmode_resend_timems int
tcpmode_compress int
tcpmode_stat int

ipaddr *net.UDPAddr
tcpaddr *net.TCPAddr
Expand Down Expand Up @@ -209,7 +212,7 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
uuid := UniqueId()
tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr)

fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress)
fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat)

now := time.Now()
clientConn := &ClientConn{tcpaddr: tcpsrcaddr, id: uuid, activeRecvTime: now, activeSendTime: now, close: false,
Expand All @@ -233,7 +236,7 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress,
p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat,
p.timeout)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
Expand Down Expand Up @@ -293,7 +296,7 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0,
p.tcpmode, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
Expand Down Expand Up @@ -354,7 +357,7 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0,
p.tcpmode, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
Expand Down Expand Up @@ -426,7 +429,7 @@ func (p *Client) Accept() error {
clientConn.activeSendTime = now
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), bytes[:n],
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0,
p.tcpmode, 0, 0, 0, 0, 0,
p.timeout)

p.sequence++
Expand Down Expand Up @@ -529,7 +532,7 @@ func (p *Client) ping() {
b, _ := now.MarshalBinary()
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, "", (uint32)(MyMsg_PING), b,
SEND_PROTO, RECV_PROTO, p.key,
0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0,
0)
loggo.Info("ping %s %s %d %d %d %d", p.addrServer, now.String(), p.sproto, p.rproto, p.id, p.sequence)
p.sequence++
Expand Down
7 changes: 6 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ Usage:
-tcp_gz 当数据包超过这个大小,tcp将压缩数据,0表示不压缩,默认0
Tcp will compress data when the packet exceeds this size, 0 means no compression, default 0
-tcp_stat 打印tcp的监控,默认0
Print tcp connection statistic, default 0 is off
-nolog 不写日志文件,只打印标准输出,默认0
Do not write log files, only print standard output, default 0 is off
Expand All @@ -70,6 +73,7 @@ func main() {
tcpmode_resend_timems := flag.Int("tcp_rst", 400, "tcp mode resend time ms")
tcpmode_compress := flag.Int("tcp_gz", 0, "tcp data compress")
nolog := flag.Int("nolog", 0, "write log file")
tcpmode_stat := flag.Int("tcp_stat", 0, "print tcp stat")
flag.Usage = func() {
fmt.Printf(usage)
}
Expand Down Expand Up @@ -117,7 +121,8 @@ func main() {
}

c, err := pingtunnel.NewClient(*listen, *server, *target, *timeout, *key,
*tcpmode, *tcpmode_buffersize, *tcpmode_maxwin, *tcpmode_resend_timems, *tcpmode_compress)
*tcpmode, *tcpmode_buffersize, *tcpmode_maxwin, *tcpmode_resend_timems, *tcpmode_compress,
*tcpmode_stat)
if err != nil {
loggo.Error("ERROR: %s", err.Error())
return
Expand Down
127 changes: 121 additions & 6 deletions framemgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,30 @@ import (
"github.com/esrrhs/go-engine/src/loggo"
"github.com/esrrhs/go-engine/src/rbuffergo"
"io"
"strconv"
"sync"
"time"
)

type FrameStat struct {
sendDataNum int
recvDataNum int
sendReqNum int
recvReqNum int
sendAckNum int
recvAckNum int
sendDataNumsMap map[int32]int
recvDataNumsMap map[int32]int
sendReqNumsMap map[int32]int
recvReqNumsMap map[int32]int
sendAckNumsMap map[int32]int
recvAckNumsMap map[int32]int
sendping int
sendpong int
recvping int
recvpong int
}

type FrameMgr struct {
sendb *rbuffergo.RBuffergo
recvb *rbuffergo.RBuffergo
Expand Down Expand Up @@ -40,9 +60,13 @@ type FrameMgr struct {
sendmap map[int32]int64

connected bool

fs *FrameStat
openstat int
lastPrintStat int64
}

func NewFrameMgr(buffersize int, windowsize int, resend_timems int, compress int) *FrameMgr {
func NewFrameMgr(buffersize int, windowsize int, resend_timems int, compress int, openstat int) *FrameMgr {

sendb := rbuffergo.New(buffersize, false)
recvb := rbuffergo.New(buffersize, false)
Expand All @@ -55,8 +79,10 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int, compress int
close: false, remoteclosed: false, closesend: false,
lastPingTime: time.Now().UnixNano(), rttns: (int64)(resend_timems * 1000),
reqmap: make(map[int32]int64), sendmap: make(map[int32]int64),
connected: false}

connected: false, openstat: openstat, lastPrintStat: time.Now().UnixNano()}
if openstat > 0 {
fm.resetStat()
}
return fm
}

Expand All @@ -83,6 +109,8 @@ func (fm *FrameMgr) Update() {
fm.calSendList()

fm.ping()

fm.printStat()
}

func (fm *FrameMgr) cutSendBufferToWindow() {
Expand Down Expand Up @@ -175,6 +203,10 @@ func (fm *FrameMgr) calSendList() {
fm.sendlist.PushBack(f)
f.Resend = false
fm.sendmap[f.Id] = cur
if fm.openstat > 0 {
fm.fs.sendDataNum++
fm.fs.sendDataNumsMap[f.Id]++
}
loggo.Debug("push frame to sendlist %d %d", f.Id, len(f.Data.Data))
}
}
Expand Down Expand Up @@ -212,6 +244,10 @@ func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int3
}
} else if f.Type == (int32)(Frame_DATA) {
tmpackto[f.Id] = f
if fm.openstat > 0 {
fm.fs.recvDataNum++
fm.fs.recvDataNumsMap[f.Id]++
}
loggo.Debug("recv data %d %d", f.Id, len(f.Data.Data))
} else if f.Type == (int32)(Frame_PING) {
fm.processPing(f)
Expand All @@ -227,7 +263,7 @@ func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int3

func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int, tmpackto map[int32]*Frame) {

for id, _ := range tmpreq {
for id, num := range tmpreq {
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
Expand All @@ -236,9 +272,13 @@ func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int,
break
}
}
if fm.openstat > 0 {
fm.fs.recvReqNum += num
fm.fs.recvReqNumsMap[id] += num
}
}

for id, _ := range tmpack {
for id, num := range tmpack {
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
Expand All @@ -248,6 +288,10 @@ func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int,
break
}
}
if fm.openstat > 0 {
fm.fs.recvAckNum += num
fm.fs.recvAckNumsMap[id] += num
}
}

if len(tmpackto) > 0 {
Expand All @@ -257,6 +301,10 @@ func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int,
if fm.addToRecvWin(rf) {
tmp[index] = id
index++
if fm.openstat > 0 {
fm.fs.sendAckNum++
fm.fs.sendAckNumsMap[id]++
}
loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data.Data))
}
}
Expand Down Expand Up @@ -407,6 +455,10 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
for id, _ := range reqtmp {
f.Dataid[index] = (int32)(id)
index++
if fm.openstat > 0 {
fm.fs.sendReqNum++
fm.fs.sendReqNumsMap[(int32)(id)]++
}
}
fm.sendlist.PushBack(f)
loggo.Debug("send req %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
Expand Down Expand Up @@ -442,18 +494,25 @@ func (fm *FrameMgr) IsRemoteClosed() bool {
func (fm *FrameMgr) ping() {
cur := time.Now().UnixNano()
if cur-fm.lastPingTime > (int64)(time.Second) {
fm.lastPingTime = cur
f := &Frame{Type: (int32)(Frame_PING), Resend: false, Sendtime: cur,
Id: 0}
fm.sendlist.PushBack(f)
loggo.Debug("send ping %d", cur)
fm.lastPingTime = cur
if fm.openstat > 0 {
fm.fs.sendping++
}
}
}

func (fm *FrameMgr) processPing(f *Frame) {
rf := &Frame{Type: (int32)(Frame_PONG), Resend: false, Sendtime: f.Sendtime,
Id: 0}
fm.sendlist.PushBack(rf)
if fm.openstat > 0 {
fm.fs.recvping++
fm.fs.sendpong++
}
loggo.Debug("recv ping %d", f.Sendtime)
}

Expand All @@ -462,6 +521,9 @@ func (fm *FrameMgr) processPong(f *Frame) {
if cur > f.Sendtime {
rtt := cur - f.Sendtime
fm.rttns = (fm.rttns + rtt) / 2
if fm.openstat > 0 {
fm.fs.recvpong++
}
loggo.Debug("recv pong %d %dms", rtt, fm.rttns/1000/1000)
}
}
Expand Down Expand Up @@ -567,3 +629,56 @@ func (fm *FrameMgr) deCompressData(src []byte) (error, []byte) {
r.Close()
return nil, out.Bytes()
}

func (fm *FrameMgr) resetStat() {
fm.fs = &FrameStat{}
fm.fs.sendDataNumsMap = make(map[int32]int)
fm.fs.recvDataNumsMap = make(map[int32]int)
fm.fs.sendReqNumsMap = make(map[int32]int)
fm.fs.recvReqNumsMap = make(map[int32]int)
fm.fs.sendAckNumsMap = make(map[int32]int)
fm.fs.recvAckNumsMap = make(map[int32]int)
}

func (fm *FrameMgr) printStat() {
if fm.openstat > 0 {
cur := time.Now().UnixNano()
if cur-fm.lastPrintStat > (int64)(time.Second) {
fm.lastPrintStat = cur
fs := fm.fs
loggo.Info("\nsendDataNum %d\nrecvDataNum %d\nsendReqNum %d\nrecvReqNum %d\nsendAckNum %d\nrecvAckNum %d\n"+
"sendDataNumsMap %s\nrecvDataNumsMap %s\nsendReqNumsMap %s\nrecvReqNumsMap %s\nsendAckNumsMap %s\nrecvAckNumsMap %s\n"+
"sendping %d\nrecvping %d\nsendpong %d\nrecvpong %d\n",
fs.sendDataNum, fs.recvDataNum,
fs.sendReqNum, fs.recvReqNum,
fs.sendAckNum, fs.recvAckNum,
fm.printStatMap(&fs.sendDataNumsMap), fm.printStatMap(&fs.recvDataNumsMap),
fm.printStatMap(&fs.sendReqNumsMap), fm.printStatMap(&fs.recvReqNumsMap),
fm.printStatMap(&fs.sendAckNumsMap), fm.printStatMap(&fs.recvAckNumsMap),
fs.sendping, fs.recvping,
fs.sendpong, fs.recvpong)
fm.resetStat()
}
}
}

func (fm *FrameMgr) printStatMap(m *map[int32]int) string {
tmp := make(map[int]int)
for _, v := range *m {
tmp[v]++
}
max := 0
for k, _ := range tmp {
if k > max {
max = k
}
}
var ret string
for i := 1; i <= max; i++ {
ret += strconv.Itoa(i) + "->" + strconv.Itoa(tmp[i]) + ","
}
if len(ret) <= 0 {
ret = "none"
}
return ret
}
Loading

0 comments on commit 2b6ecc4

Please sign in to comment.