Skip to content

Commit

Permalink
Merge pull request weibocom#81 from weibocom/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
rayzhang0603 authored Jul 24, 2018
2 parents 2f82b3f + 55d4a38 commit 5a5ec6d
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 31 deletions.
11 changes: 7 additions & 4 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,19 @@ type agentMessageHandler struct {
}

func (a *agentMessageHandler) Call(request motan.Request) (res motan.Response) {
if request.GetAttachment(mpro.MSource) == "" {
application := a.agent.agentURL.GetParam(motan.ApplicationKey, "")
request.SetAttachment(mpro.MSource, application)
}
version := "0.1"
if request.GetAttachment(mpro.MVersion) != "" {
version = request.GetAttachment(mpro.MVersion)
}
ck := getClusterKey(request.GetAttachment(mpro.MGroup), version, request.GetAttachment(mpro.MProxyProtocol), request.GetAttachment(mpro.MPath))
if motanCluster := a.agent.clustermap[ck]; motanCluster != nil {
if request.GetAttachment(mpro.MSource) == "" {
application := motanCluster.GetURL().GetParam(motan.ApplicationKey, "")
if application == "" {
application = a.agent.agentURL.GetParam(motan.ApplicationKey, "")
}
request.SetAttachment(mpro.MSource, application)
}
res = motanCluster.Call(request)
if res == nil {
vlog.Warningf("motanCluster Call return nil. cluster:%s\n", ck)
Expand Down
16 changes: 6 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"fmt"
"sync"

cluster "github.com/weibocom/motan-go/cluster"
"github.com/weibocom/motan-go/cluster"
motan "github.com/weibocom/motan-go/core"
mpro "github.com/weibocom/motan-go/protocol"
)
Expand Down Expand Up @@ -75,19 +75,15 @@ func (c *Client) BaseGo(req motan.Request, reply interface{}, done chan *motan.A
func (c *Client) BuildRequest(method string, args []interface{}) motan.Request {
req := &motan.MotanRequest{Method: method, ServiceName: c.url.Path, Arguments: args, Attachment: motan.NewStringMap(motan.DefaultAttachmentSize)}
version := c.url.GetParam(motan.VersionKey, "")
if version != "" {
req.SetAttachment(mpro.MVersion, version)
}
req.SetAttachment(mpro.MVersion, version)
module := c.url.GetParam(motan.ModuleKey, "")
if module != "" {
req.SetAttachment(mpro.MModule, module)
}
req.SetAttachment(mpro.MModule, module)
application := c.url.GetParam(motan.ApplicationKey, "")
if application != "" {
req.SetAttachment(mpro.MSource, application)
if application == "" {
application = c.cluster.Context.ClientURL.GetParam(motan.ApplicationKey, "")
}
req.SetAttachment(mpro.MSource, application)
req.SetAttachment(mpro.MGroup, c.url.Group)

return req
}

Expand Down
6 changes: 6 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package core

import "time"

//--------------all global public constants--------------
// exception type
const (
Expand Down Expand Up @@ -49,3 +51,7 @@ const (
NodeTypeReferer = "referer"
NodeTypeAgent = "agent"
)

const (
DefaultWriteTimeout = 5 * time.Second
)
8 changes: 4 additions & 4 deletions core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Context struct {
Config *cfg.Config
RegistryURLs map[string]*URL
RefersURLs map[string]*URL
BasicRefers map[string]*URL
BasicReferURLs map[string]*URL
ServiceURLs map[string]*URL
BasicServiceURLs map[string]*URL
AgentURL *URL
Expand Down Expand Up @@ -118,7 +118,7 @@ func confToURL(urlInfo map[interface{}]interface{}) *URL {
func (c *Context) Initialize() {
c.RegistryURLs = make(map[string]*URL)
c.RefersURLs = make(map[string]*URL)
c.BasicRefers = make(map[string]*URL)
c.BasicReferURLs = make(map[string]*URL)
c.ServiceURLs = make(map[string]*URL)
c.BasicServiceURLs = make(map[string]*URL)
if c.ConfigFile == "" { // use flag as default config file name
Expand Down Expand Up @@ -277,7 +277,7 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL {
basicURLs = c.BasicServiceURLs
basicKey = basicServiceKey
} else if section == refersSection {
basicURLs = c.BasicRefers
basicURLs = c.BasicReferURLs
basicKey = basicReferKey
}
for key, url := range urls {
Expand Down Expand Up @@ -319,7 +319,7 @@ func (c *Context) parseRefers() {
}

func (c *Context) parseBasicRefers() {
c.BasicRefers = c.confToURLs(basicRefersSection)
c.BasicReferURLs = c.confToURLs(basicRefersSection)
}

func (c *Context) parseServices() {
Expand Down
14 changes: 10 additions & 4 deletions endpoint/motanEndpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"errors"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
Expand All @@ -26,6 +25,8 @@ var (
ErrRecvRequestTimeout = fmt.Errorf("Timeout err: receive request timeout")

defaultAsyncResponse = &motan.MotanResponse{Attachment: motan.NewStringMap(motan.DefaultAttachmentSize), RPCContext: &motan.RPCContext{AsyncCall: true}}

panicErr = errors.New("panic error")
)

type MotanEndpoint struct {
Expand Down Expand Up @@ -255,7 +256,7 @@ type Channel struct {
address string

// connection
conn io.ReadWriteCloser
conn net.Conn
bufRead *bufio.Reader

// send
Expand Down Expand Up @@ -424,7 +425,9 @@ func (c *Channel) IsClosed() bool {
}

func (c *Channel) recv() {
defer motan.HandlePanic(nil)
defer motan.HandlePanic(func() {
c.closeOnErr(panicErr)
})
if err := c.recvLoop(); err != nil {
c.closeOnErr(err)
}
Expand All @@ -450,12 +453,15 @@ func (c *Channel) recvLoop() error {
}

func (c *Channel) send() {
defer motan.HandlePanic(nil)
defer motan.HandlePanic(func() {
c.closeOnErr(panicErr)
})
for {
select {
case ready := <-c.sendCh:
if ready.data != nil {
// TODO need async?
c.conn.SetWriteDeadline(time.Now().Add(motan.DefaultWriteTimeout))
sent := 0
for sent < len(ready.data) {
n, err := c.conn.Write(ready.data[sent:])
Expand Down
25 changes: 16 additions & 9 deletions server/motanserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
motan "github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/log"
mpro "github.com/weibocom/motan-go/protocol"
"time"
)

type MotanServer struct {
Expand Down Expand Up @@ -81,9 +82,8 @@ func (m *MotanServer) run() {
}

func (m *MotanServer) handleConn(conn net.Conn) {
defer motan.HandlePanic(func() {
conn.Close()
})
defer conn.Close()
defer motan.HandlePanic(nil)
buf := bufio.NewReader(conn)
for {
request, err := mpro.Decode(buf)
Expand All @@ -100,6 +100,12 @@ func (m *MotanServer) handleConn(conn net.Conn) {
func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
defer motan.HandlePanic(nil)
request.Header.SetProxy(m.proxy)
var ip string
if ta, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip = ta.IP.String()
} else {
ip = getRemoteIP(conn.RemoteAddr().String())
}
// TODO request , response reuse
var res *mpro.Message
if request.Header.IsHeartbeat() {
Expand All @@ -112,11 +118,7 @@ func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
vlog.Errorf("motan server convert to motan request fail. rid :%d, service: %s, method:%s,err:%s\n", request.Header.RequestID, request.Metadata.LoadOrEmpty(mpro.MPath), request.Metadata.LoadOrEmpty(mpro.MMethod), err.Error())
res = mpro.BuildExceptionResponse(request.Header.RequestID, mpro.ExceptionToJSON(&motan.Exception{ErrCode: 500, ErrMsg: "deserialize fail. err:" + err.Error() + " method:" + request.Metadata.LoadOrEmpty(mpro.MMethod), ErrType: motan.ServiceException}))
} else {
if ta, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
req.SetAttachment(motan.HostKey, ta.IP.String())
} else {
req.SetAttachment(motan.HostKey, getRemoteIP(conn.RemoteAddr().String()))
}
req.SetAttachment(motan.HostKey, ip)
req.GetRPCContext(true).ExtFactory = m.extFactory
mres = m.handler.Call(req)
//TODO oneway
Expand All @@ -133,7 +135,12 @@ func (m *MotanServer) processReq(request *mpro.Message, conn net.Conn) {
}
}
resbuf := res.Encode()
conn.Write(resbuf.Bytes())
conn.SetWriteDeadline(time.Now().Add(motan.DefaultWriteTimeout))
_, err := conn.Write(resbuf.Bytes())
if err != nil {
vlog.Errorf("connection will close. ip: %s, cause:%s\n", ip, err.Error())
conn.Close()
}
}

func getRemoteIP(address string) string {
Expand Down

0 comments on commit 5a5ec6d

Please sign in to comment.