Skip to content

Commit

Permalink
optimize protocol encode/decode; optimize simple serialize
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei28 committed Mar 7, 2018
1 parent 1ce912b commit 8e9686f
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 114 deletions.
2 changes: 2 additions & 0 deletions core/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
defaultSerialize = "simple"
)

//TODO int param cache

// GetIdentity return the identity of url. identity info includes protocol, host, port, path, group
// the identity will cached, so must clear cached info after update above info by calling ClearCachedInfo()
func (u *URL) GetIdentity() string {
Expand Down
18 changes: 3 additions & 15 deletions endpoint/motanEndpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type MotanEndpoint struct {
channels *ChannelPool
destroyCh chan struct{}
available bool
mux sync.RWMutex
errorCount uint32
proxy bool

Expand All @@ -43,9 +42,7 @@ type MotanEndpoint struct {
}

func (m *MotanEndpoint) setAvailable(available bool) {
m.mux.Lock()
m.available = available
m.mux.Unlock()
}

func (m *MotanEndpoint) SetSerialization(s motan.Serialization) {
Expand Down Expand Up @@ -224,8 +221,6 @@ func (m *MotanEndpoint) SetURL(url *motan.URL) {
}

func (m *MotanEndpoint) IsAvailable() bool {
m.mux.RLock()
defer m.mux.RUnlock()
return m.available
}

Expand Down Expand Up @@ -422,12 +417,7 @@ func (c *Channel) Call(msg *mpro.Message, deadline time.Duration, rc *motan.RPCC
}

func (c *Channel) IsClosed() bool {
select {
case <-c.shutdownCh:
return true
default:
return false
}
return c.shutdown
}

func (c *Channel) recv() {
Expand All @@ -438,7 +428,7 @@ func (c *Channel) recv() {

func (c *Channel) recvLoop() error {
for {
res, err := mpro.DecodeFromReader(c.bufRead)
res, err := mpro.Decode(c.bufRead)
if err != nil {
return err
}
Expand Down Expand Up @@ -537,8 +527,6 @@ type ChannelPool struct {
}

func (c *ChannelPool) getChannels() chan *Channel {
c.channelsLock.Lock()
defer c.channelsLock.Unlock()
channels := c.channels
return channels
}
Expand Down Expand Up @@ -579,7 +567,7 @@ func retChannelPool(channels chan *Channel, channel *Channel) (error error) {
}

func (c *ChannelPool) Close() error {
c.channelsLock.Lock()
c.channelsLock.Lock() // to prevent channels closed many times
channels := c.channels
c.channels = nil
c.factory = nil
Expand Down
139 changes: 71 additions & 68 deletions protocol/motanProtocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"errors"
"io"
"io/ioutil"
"strings"

motan "github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/log"
Expand Down Expand Up @@ -214,11 +213,8 @@ func BuildHeader(msgType int, proxy bool, serialize int, requestID uint64, msgSt
}

func (msg *Message) Encode() (buf *bytes.Buffer) {
buf = new(bytes.Buffer)
var metabuf bytes.Buffer
var meta map[string]string
meta = msg.Metadata
for k, v := range meta {
metabuf := bytes.NewBuffer(make([]byte, 0, 256))
for k, v := range msg.Metadata {
metabuf.WriteString(k)
metabuf.WriteString("\n")
metabuf.WriteString(v)
Expand All @@ -231,85 +227,98 @@ func (msg *Message) Encode() (buf *bytes.Buffer) {
metasize = 0
}

body := msg.Body
bodysize := int32(len(body))
binary.Write(buf, binary.BigEndian, msg.Header)
binary.Write(buf, binary.BigEndian, metasize)
bodysize := int32(len(msg.Body))
buf = bytes.NewBuffer(make([]byte, 0, (HeaderLength + metasize + bodysize + 8)))

// encode header.
temp := make([]byte, 8, 8)
binary.BigEndian.PutUint16(temp, msg.Header.Magic)
buf.Write(temp[:2])
buf.WriteByte(msg.Header.MsgType)
buf.WriteByte(msg.Header.VersionStatus)
buf.WriteByte(msg.Header.Serialize)
binary.BigEndian.PutUint64(temp, msg.Header.RequestID)
buf.Write(temp)

// encode meta
binary.BigEndian.PutUint32(temp, uint32(metasize))
buf.Write(temp[:4])
if metasize > 0 {
binary.Write(buf, binary.BigEndian, metabuf.Bytes()[:metasize])
buf.Write(metabuf.Bytes()[:metasize])
}

binary.Write(buf, binary.BigEndian, bodysize)
// encode body
binary.BigEndian.PutUint32(temp, uint32(bodysize))
buf.Write(temp[:4])
if bodysize > 0 {
binary.Write(buf, binary.BigEndian, body)
buf.Write(msg.Body)
}
return buf
}

// Decode decode one message from buffer
func Decode(reqbuf *bytes.Buffer) *Message {
header := &Header{}
binary.Read(reqbuf, binary.BigEndian, header)
metasize := readInt32(reqbuf)
metamap := make(map[string]string)
if metasize > 0 {
metadata := string(reqbuf.Next(int(metasize)))
values := strings.Split(metadata, "\n")
size := len(values)
for i := 0; i < size; i++ {
key := values[i]
i++
if i >= size {
vlog.Errorf("decode message fail!metadata not paired. header:%v, meta:%s\n", header, metadata)
return nil
}
metamap[key] = values[i]
}
func Decode(buf *bufio.Reader) (msg *Message, err error) {
temp := make([]byte, HeaderLength, HeaderLength)

}
bodysize := readInt32(reqbuf)
var body []byte
if bodysize > 0 {
body = reqbuf.Next(int(bodysize))
} else {
body = make([]byte, 0)
}
msg := &Message{header, metamap, body, Req}
return msg
}

// DecodeFromReader decode one message from reader
func DecodeFromReader(buf *bufio.Reader) (msg *Message, err error) {
header := &Header{}
err = binary.Read(buf, binary.BigEndian, header)
// decode header
_, err = io.ReadAtLeast(buf, temp, HeaderLength)
if err != nil {
vlog.Errorf("not enough bytes to decode motan message header. err:%v\n", err)
return nil, errors.New("not enough bytes to decode motan message header.")
}
mn := binary.BigEndian.Uint16(temp[:2])
if mn != MotanMagic {
vlog.Errorf("worng magic num:%d, err:%v\n", mn, err)
return nil, errors.New("motan magic num not correct.")
}
header := &Header{Magic: MotanMagic}
header.MsgType = temp[2]
header.VersionStatus = temp[3]
header.Serialize = temp[4]
header.RequestID = binary.BigEndian.Uint64(temp[5:])

// decode meta
_, err = io.ReadAtLeast(buf, temp[:4], 4)
if err != nil {
vlog.Errorf("decode motan message fail. err:%v\n", err)
return nil, err
}

metasize := readInt32(buf)
metasize := int(binary.BigEndian.Uint32(temp[:4]))
metamap := make(map[string]string)
if metasize > 0 {
metadata, err := readBytes(buf, int(metasize))
metadata, err := readBytes(buf, metasize)
if err != nil {
return nil, err
}
values := strings.Split(string(metadata), "\n")
size := len(values)
for i := 0; i < size; i++ {
key := values[i]
i++
if i >= size {
vlog.Errorf("decode message fail, metadata not paired. header:%v, meta:%s\n", header, metadata)
return nil, errors.New("decode message fail, metadata not paired")
s, e := 0, 0
var k string
for i := 0; i <= metasize; i++ {
if i == metasize || metadata[i] == '\n' {
e = i
if k == "" {
k = string(metadata[s:e])
} else {
metamap[k] = string(metadata[s:e])
k = ""
}
s = i + 1
}
metamap[key] = values[i]
}
if k != "" {
vlog.Errorf("decode message fail, metadata not paired. header:%v, meta:%s\n", header, metadata)
return nil, errors.New("decode message fail, metadata not paired")
}
}

//decode body
_, err = io.ReadAtLeast(buf, temp[:4], 4)
if err != nil {
vlog.Errorf("decode motan message fail. err:%v\n", err)
return nil, err
}
bodysize := readInt32(buf)
bodysize := int(binary.BigEndian.Uint32(temp[:4]))
var body []byte
if bodysize > 0 {
body, err = readBytes(buf, int(bodysize))
body, err = readBytes(buf, bodysize)
} else {
body = make([]byte, 0)
}
Expand All @@ -329,12 +338,6 @@ func DecodeGzipBody(body []byte) []byte {
return ret
}

func readInt32(buf io.Reader) int32 {
var i int32
binary.Read(buf, binary.BigEndian, &i)
return i
}

func readBytes(buf *bufio.Reader, size int) ([]byte, error) {
tempbytes := make([]byte, size)
var s, n int = 0, 0
Expand Down
6 changes: 4 additions & 2 deletions protocol/motanProtocol_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocol

import (
"bufio"
"fmt"
"testing"
)
Expand Down Expand Up @@ -133,8 +134,9 @@ func TestEncode(t *testing.T) {
body := []byte("testbody")
msg := &Message{Header: h, Metadata: meta, Body: body}
ebytes := msg.Encode()

fmt.Println("len:", ebytes.Len())
newMsg := Decode(ebytes)
newMsg, err := Decode(bufio.NewReader(ebytes))
if newMsg == nil {
t.Fatalf("encode message fail")
}
Expand All @@ -153,7 +155,7 @@ func TestEncode(t *testing.T) {
msg.Header.SetGzip(true)
msg.Body, _ = EncodeGzip([]byte("gzip encode"))
b := msg.Encode()
newMsg = Decode(b)
newMsg, _ = Decode(bufio.NewReader(b))
// should not decode gzip
if !newMsg.Header.IsGzip() {
t.Fatalf("encode message fail")
Expand Down
Loading

0 comments on commit 8e9686f

Please sign in to comment.