Skip to content

Commit

Permalink
合并封包部分抽象成单独的结构
Browse files Browse the repository at this point in the history
  • Loading branch information
davyxu committed Sep 18, 2016
1 parent d9501f3 commit 3fd54c4
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 76 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

# 性能测试

go test -v github.com/davyxu/cellnet/test/benchmark
go test -v github.com/davyxu/cellnet/benchmark

CPU: i7 6700 3.4GHz

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (self *QPSMeter) Turn() (ret int) {
// 均值
func (self *QPSMeter) Average() int {

self.qpsGuard.Lock()

defer self.qpsGuard.Unlock()

if self.count == 0 {
return 0
}
Expand Down
53 changes: 53 additions & 0 deletions socket/pktlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package socket

import (
"sync"

"github.com/davyxu/cellnet"
)

type PacketList struct {
list []*cellnet.Packet
listGuard sync.Mutex
listCond *sync.Cond
}

func (self *PacketList) Add(p *cellnet.Packet) {
self.listGuard.Lock()
self.list = append(self.list, p)
self.listGuard.Unlock()

self.listCond.Signal()
}

func (self *PacketList) Reset() {
self.list = self.list[0:0]
}

func (self *PacketList) BeginPick() []*cellnet.Packet {

self.listGuard.Lock()

for len(self.list) == 0 {
self.listCond.Wait()
}

self.listGuard.Unlock()

self.listGuard.Lock()

return self.list
}

func (self *PacketList) EndPick() {

self.Reset()
self.listGuard.Unlock()
}

func NewPacketList() *PacketList {
self := &PacketList{}
self.listCond = sync.NewCond(&self.listGuard)

return self
}
66 changes: 35 additions & 31 deletions socket/pktstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const (
// 封包流
type PacketStream interface {
Read() (*cellnet.Packet, error)
Write(pkt *cellnet.Packet, flush bool) error
Write(pkt *cellnet.Packet) error
Flush() error
Close() error
Raw() net.Conn
}
Expand All @@ -31,11 +32,11 @@ type ltvStream struct {
conn net.Conn
sendtagGuard sync.RWMutex

sdWriter *bufio.Writer
sdHeadBuf *bytes.Buffer
outputWriter *bufio.Writer
outputHeadBuffer *bytes.Buffer

rdHeadBuf []byte
rdHeadReader *bytes.Reader
inputHeadBuffer []byte
headReader *bytes.Reader
}

var (
Expand All @@ -47,30 +48,30 @@ var (
// 从socket读取1个封包,并返回
func (self *ltvStream) Read() (p *cellnet.Packet, err error) {

if _, err = self.rdHeadReader.Seek(0, 0); err != nil {
if _, err = self.headReader.Seek(0, 0); err != nil {
return nil, err
}

if _, err = io.ReadFull(self.conn, self.rdHeadBuf); err != nil {
if _, err = io.ReadFull(self.conn, self.inputHeadBuffer); err != nil {
return nil, err
}

p = &cellnet.Packet{}

// 读取ID
if err = binary.Read(self.rdHeadReader, binary.LittleEndian, &p.MsgID); err != nil {
if err = binary.Read(self.headReader, binary.LittleEndian, &p.MsgID); err != nil {
return nil, err
}

// 读取序号
var ser uint16
if err = binary.Read(self.rdHeadReader, binary.LittleEndian, &ser); err != nil {
if err = binary.Read(self.headReader, binary.LittleEndian, &ser); err != nil {
return nil, err
}

// 读取整包大小
var fullsize uint16
if err = binary.Read(self.rdHeadReader, binary.LittleEndian, &fullsize); err != nil {
if err = binary.Read(self.headReader, binary.LittleEndian, &fullsize); err != nil {
return nil, err
}

Expand Down Expand Up @@ -102,49 +103,52 @@ func (self *ltvStream) Read() (p *cellnet.Packet, err error) {
}

// 将一个封包发送到socket
func (self *ltvStream) Write(pkt *cellnet.Packet, flush bool) (err error) {

self.sdHeadBuf.Reset()
func (self *ltvStream) Write(pkt *cellnet.Packet) (err error) {

// 防止将Send放在go内造成的多线程冲突问题
self.sendtagGuard.Lock()
defer self.sendtagGuard.Unlock()

self.outputHeadBuffer.Reset()

// 写ID
if err = binary.Write(self.sdHeadBuf, binary.LittleEndian, pkt.MsgID); err != nil {
if err = binary.Write(self.outputHeadBuffer, binary.LittleEndian, pkt.MsgID); err != nil {
return err
}

// 写序号
if err = binary.Write(self.sdHeadBuf, binary.LittleEndian, self.sendser); err != nil {
if err = binary.Write(self.outputHeadBuffer, binary.LittleEndian, self.sendser); err != nil {
return err
}

// 写包大小
if err = binary.Write(self.sdHeadBuf, binary.LittleEndian, uint16(len(pkt.Data)+PackageHeaderSize)); err != nil {
if err = binary.Write(self.outputHeadBuffer, binary.LittleEndian, uint16(len(pkt.Data)+PackageHeaderSize)); err != nil {
return err
}

// 发包头
if _, err = self.sdWriter.Write(self.sdHeadBuf.Bytes()); err != nil {
if _, err = self.outputWriter.Write(self.outputHeadBuffer.Bytes()); err != nil {
return err
}

// 发包内容
if _, err = self.sdWriter.Write(pkt.Data); err != nil {
if _, err = self.outputWriter.Write(pkt.Data); err != nil {
return err
}

// 增加序号值
self.sendser++

if flush {
if err = self.sdWriter.Flush(); err != nil && err != io.ErrShortWrite {
return err
}
return
}

func (self *ltvStream) Flush() error {

if err := self.outputWriter.Flush(); err != nil && err != io.ErrShortWrite {
return err
}

return
return nil
}

// 关闭
Expand All @@ -161,14 +165,14 @@ func (self *ltvStream) Raw() net.Conn {
func NewPacketStream(conn net.Conn) PacketStream {

s := &ltvStream{
conn: conn,
recvser: 1,
sendser: 1,
sdWriter: bufio.NewWriter(conn),
sdHeadBuf: bytes.NewBuffer([]byte{}),
rdHeadBuf: make([]byte, PackageHeaderSize),
}
s.rdHeadReader = bytes.NewReader(s.rdHeadBuf)
conn: conn,
recvser: 1,
sendser: 1,
outputWriter: bufio.NewWriter(conn),
outputHeadBuffer: bytes.NewBuffer([]byte{}),
inputHeadBuffer: make([]byte, PackageHeaderSize),
}
s.headReader = bytes.NewReader(s.inputHeadBuffer)

return s
}
62 changes: 18 additions & 44 deletions socket/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package socket

import (
"sync"
"time"

"github.com/davyxu/cellnet"
"github.com/golang/protobuf/proto"
Expand All @@ -24,9 +23,7 @@ type ltvSession struct {

needNotifyWrite bool // 是否需要通知写线程关闭

sendList []interface{}
sendListGuard sync.Mutex
sendListCond *sync.Cond
sendList *PacketList
}

func (self *ltvSession) ID() int64 {
Expand All @@ -38,7 +35,7 @@ func (self *ltvSession) FromPeer() cellnet.Peer {
}

func (self *ltvSession) Close() {
self.pushSendMsg(closeWritePacket{})
self.sendList.Add(&cellnet.Packet{})
}

func (self *ltvSession) Send(data interface{}) {
Expand All @@ -64,60 +61,38 @@ func (self *ltvSession) Send(data interface{}) {
func (self *ltvSession) RawSend(pkt *cellnet.Packet) {

if pkt != nil {
self.pushSendMsg(pkt)
self.sendList.Add(pkt)
}
}

func (self *ltvSession) pushSendMsg(msg interface{}) {
self.sendListGuard.Lock()
self.sendList = append(self.sendList, msg)
self.sendListGuard.Unlock()

self.sendListCond.Signal()
}

// 发送线程
func (self *ltvSession) sendThread() {

packetList := make([]*cellnet.Packet, 0)

for {
self.sendListGuard.Lock()

for len(self.sendList) == 0 {
self.sendListCond.Wait()
}

self.sendListGuard.Unlock()

//强制sleep,积累消息(用于批量flush)
time.Sleep(time.Millisecond)
packetList := self.sendList.BeginPick()

willExit := false

self.sendListGuard.Lock()
for _, p := range packetList {

for _, v := range self.sendList {
switch v.(type) {
case *cellnet.Packet:
packetList = append(packetList, v.(*cellnet.Packet))
case closeWritePacket:
if p.MsgID == 0 {
willExit = true
}
}
self.sendList = self.sendList[0:0]
} else {

self.sendListGuard.Unlock()
if err := self.stream.Write(p); err != nil {
willExit = true
break
}

for i, p := range packetList {
//当发送最后一个消息,且后续消息列表为空时,进行flush
if err := self.stream.Write(p, len(packetList) == (i+1) && len(self.sendList) == 0); err != nil {
willExit = true
break
}

}

packetList = packetList[0:0]
self.sendList.EndPick()

if err := self.stream.Flush(); err != nil {
willExit = true
}

if willExit {
goto exitsendloop
Expand Down Expand Up @@ -175,10 +150,9 @@ func newSession(stream PacketStream, eq cellnet.EventQueue, p cellnet.Peer) *ltv
stream: stream,
p: p,
needNotifyWrite: true,
sendList: NewPacketList(),
}

self.sendListCond = sync.NewCond(&self.sendListGuard)

// 布置接收和发送2个任务
// bug fix感谢viwii提供的线索
self.endSync.Add(2)
Expand Down

0 comments on commit 3fd54c4

Please sign in to comment.