forked from weibocom/motan-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmeshClient.go
143 lines (128 loc) · 4.64 KB
/
meshClient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package motan
import (
"errors"
"strconv"
"strings"
"time"
"github.com/valyala/fasthttp"
"github.com/weibocom/motan-go/cluster"
"github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/endpoint"
mhttp "github.com/weibocom/motan-go/http"
"github.com/weibocom/motan-go/protocol"
)
const (
DefaultMeshRequestTimeout = 5 * time.Second
DefaultMeshAddress = "127.0.0.1:9981"
DefaultMeshSerialization = "simple"
)
const meshDirectRegistryKey = "mesh-registry"
type MeshClient struct {
requestTimeout time.Duration
application string
address string
serialization string
cluster *cluster.MotanCluster
httpClient *fasthttp.Client
}
func NewMeshClient() *MeshClient {
return &MeshClient{}
}
func (c *MeshClient) SetAddress(address string) {
c.address = address
}
func (c *MeshClient) SetRequestTimeout(requestTimeout time.Duration) {
c.requestTimeout = requestTimeout
}
func (c *MeshClient) SetSerialization(serialization string) {
c.serialization = serialization
}
func (c *MeshClient) SetApplication(application string) {
c.application = application
}
func (c *MeshClient) Initialize() {
if c.requestTimeout == 0 {
c.requestTimeout = DefaultMeshRequestTimeout
}
if c.address == "" {
c.address = DefaultMeshAddress
}
if c.serialization == "" {
c.serialization = DefaultMeshSerialization
}
c.httpClient = &fasthttp.Client{}
clusterURL := &core.URL{}
clusterURL.Protocol = endpoint.Motan2
clusterURL.PutParam(core.TimeOutKey, strconv.Itoa(int(c.requestTimeout/time.Millisecond)))
clusterURL.PutParam(core.ApplicationKey, c.application)
clusterURL.PutParam(core.ErrorCountThresholdKey, "0")
clusterURL.PutParam(core.RegistryKey, meshDirectRegistryKey)
clusterURL.PutParam(core.ConnectRetryIntervalKey, "5000")
clusterURL.PutParam(core.SerializationKey, c.serialization)
meshRegistryURL := &core.URL{}
meshRegistryURL.Protocol = "direct"
meshRegistryURL.PutParam(core.AddressKey, c.address)
context := &core.Context{}
context.RegistryURLs = make(map[string]*core.URL)
context.RegistryURLs[meshDirectRegistryKey] = meshRegistryURL
c.cluster = cluster.NewCluster(context, GetDefaultExtFactory(), clusterURL, false)
}
func (c *MeshClient) Destroy() {
c.cluster.Destroy()
}
func (c *MeshClient) BuildRequestWithGroup(service string, method string, args []interface{}, group string) core.Request {
request := &core.MotanRequest{Method: method, ServiceName: service, Arguments: args, Attachment: core.NewStringMap(core.DefaultAttachmentSize)}
request.RequestID = endpoint.GenerateRequestID()
request.SetAttachment(protocol.MSource, c.application)
request.SetAttachment(protocol.MGroup, group)
request.SetAttachment(protocol.MPath, request.GetServiceName())
return request
}
func (c *MeshClient) BuildRequest(service string, method string, args []interface{}) core.Request {
return c.BuildRequestWithGroup(service, method, args, "")
}
func (c *MeshClient) Call(service string, method string, args []interface{}, reply interface{}) error {
request := c.BuildRequest(service, method, args)
response := c.BaseCall(request, reply)
if response.GetException() != nil {
return errors.New(response.GetException().ErrMsg)
}
return nil
}
func (c *MeshClient) BaseCall(request core.Request, reply interface{}) core.Response {
rc := request.GetRPCContext(true)
rc.Reply = reply
response := c.cluster.Call(request)
// none http call direct response
if strings.Index(request.GetMethod(), "/") == -1 {
return response
}
// http request fallback to http call if mesh can not connect
if response.GetException() == nil || (response.GetException().ErrCode != core.ENoChannel && response.GetException().ErrCode != core.ENoEndpoints) {
return response
}
httpRequest := fasthttp.AcquireRequest()
httpResponse := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(httpRequest)
defer fasthttp.ReleaseResponse(httpResponse)
httpRequest.Header.Del("Host")
httpRequest.SetHost(request.GetServiceName())
httpRequest.URI().SetPath(request.GetMethod())
err := mhttp.MotanRequestToFasthttpRequest(request, httpRequest, "GET")
if err != nil {
return getDefaultResponse(request.GetRequestID(), "bad motan-http request: "+err.Error())
}
err = c.httpClient.Do(httpRequest, httpResponse)
if err != nil {
return getDefaultResponse(request.GetRequestID(), "do http request failed : "+err.Error())
}
response = &core.MotanResponse{RequestID: request.GetRequestID()}
mhttp.FasthttpResponseToMotanResponse(response, httpResponse)
if replyPointer, ok := rc.Reply.(*[]byte); ok {
*replyPointer = response.GetValue().([]byte)
}
if replyPointer, ok := rc.Reply.(*string); ok {
*replyPointer = string(response.GetValue().([]byte))
}
return response
}