Skip to content

Commit

Permalink
merge master from lujiajing1126:master
Browse files Browse the repository at this point in the history
  • Loading branch information
souriki committed Jun 5, 2017
1 parent 4189bac commit 8bd3273
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 99 deletions.
75 changes: 40 additions & 35 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ import (
"encoding/xml"
"fmt"
"net/http"
"net/url"
neturl "net/url"
"os"
"strings"
"bytes"
"sync"
"time"

"github.com/gogap/errors"
"github.com/mreiferson/go-httpclient"
"github.com/valyala/fasthttp"
)

const (
DefaultQueueQPSLimit int32 = 2000
DefaultTopicQPSLimit int32 = 2000
DefaultDNSTTL int32 = 10
)

const (
Expand Down Expand Up @@ -51,7 +53,7 @@ const (
)

type MNSClient interface {
Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error)
Send(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error)
SetProxy(url string)

getAccountID() (accountId string)
Expand All @@ -60,10 +62,10 @@ type MNSClient interface {

type aliMNSClient struct {
Timeout int64
url string
url *neturl.URL
credential Credential
accessKeyId string
client *http.Client
client *fasthttp.Client
proxyURL string

accountId string
Expand All @@ -72,8 +74,8 @@ type aliMNSClient struct {
clientLocker sync.Mutex
}

func NewAliMNSClient(url, accessKeyId, accessKeySecret string) MNSClient {
if url == "" {
func NewAliMNSClient(inputUrl, accessKeyId, accessKeySecret string) MNSClient {
if inputUrl == "" {
panic("ali-mns: message queue url is empty")
}

Expand All @@ -82,10 +84,14 @@ func NewAliMNSClient(url, accessKeyId, accessKeySecret string) MNSClient {
cli := new(aliMNSClient)
cli.credential = credential
cli.accessKeyId = accessKeyId
cli.url = url

var err error
if cli.url, err = neturl.Parse(inputUrl); err != nil {
panic("err parse url")
}

// 1. parse region and accountid
pieces := strings.Split(url, ".")
pieces := strings.Split(inputUrl, ".")
if len(pieces) != 5 {
panic("ali-mns: message queue url is invalid")
}
Expand All @@ -101,7 +107,8 @@ func NewAliMNSClient(url, accessKeyId, accessKeySecret string) MNSClient {
}

// 2. now init http client
cli.initClient()
cli.initFastHttpClient()

return cli
}

Expand All @@ -121,8 +128,7 @@ func (p *aliMNSClient) SetProxy(url string) {
p.proxyURL = url
}

func (p *aliMNSClient) initClient() {

func (p *aliMNSClient) initFastHttpClient() {
p.clientLocker.Lock()
defer p.clientLocker.Unlock()

Expand All @@ -134,19 +140,12 @@ func (p *aliMNSClient) initClient() {

timeout := time.Second * time.Duration(timeoutInt)

transport := &httpclient.Transport{
Proxy: p.proxy,
ConnectTimeout: time.Second * 3,
RequestTimeout: timeout,
ResponseHeaderTimeout: timeout + time.Second,
}

p.client = &http.Client{Transport: transport}
p.client = &fasthttp.Client{ReadTimeout: timeout, WriteTimeout: timeout}
}

func (p *aliMNSClient) proxy(req *http.Request) (*url.URL, error) {
func (p *aliMNSClient) proxy(req *http.Request) (*neturl.URL, error) {
if p.proxyURL != "" {
return url.Parse(p.proxyURL)
return neturl.Parse(p.proxyURL)
}
return nil, nil
}
Expand All @@ -161,8 +160,9 @@ func (p *aliMNSClient) authorization(method Method, headers map[string]string, r
return
}

func (p *aliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string) (resp *http.Response, err error) {
func (p *aliMNSClient) Send(method Method, headers map[string]string, message interface{}, resource string) (*fasthttp.Response, error) {
var xmlContent []byte
var err error

if message == nil {
xmlContent = []byte{}
Expand All @@ -175,7 +175,7 @@ func (p *aliMNSClient) Send(method Method, headers map[string]string, message in
default:
if bXml, e := xml.Marshal(message); e != nil {
err = ERR_MARSHAL_MESSAGE_FAILED.New(errors.Params{"err": e})
return
return nil, err
} else {
xmlContent = bXml
}
Expand All @@ -196,31 +196,36 @@ func (p *aliMNSClient) Send(method Method, headers map[string]string, message in

if authHeader, e := p.authorization(method, headers, fmt.Sprintf("/%s", resource)); e != nil {
err = ERR_GENERAL_AUTH_HEADER_FAILED.New(errors.Params{"err": e})
return
return nil, err
} else {
headers[AUTHORIZATION] = authHeader
}

url := p.url + "/" + resource
var buffer bytes.Buffer
buffer.WriteString(p.url.String())
buffer.WriteString("/")
buffer.WriteString(resource)

postBodyReader := strings.NewReader(string(xmlContent))
url := buffer.String()

var req *http.Request
if req, err = http.NewRequest(string(method), url, postBodyReader); err != nil {
err = ERR_CREATE_NEW_REQUEST_FAILED.New(errors.Params{"err": err})
return
}
req := fasthttp.AcquireRequest()

req.SetRequestURI(url)
req.Header.SetMethod(string(method))
req.SetBody(xmlContent)

for header, value := range headers {
req.Header.Add(header, value)
}

if resp, err = p.client.Do(req); err != nil {
resp := fasthttp.AcquireResponse()

if err = p.client.Do(req,resp); err != nil {
err = ERR_SEND_REQUEST_FAILED.New(errors.Params{"err": err})
return
return nil , err
}

return
return resp, nil
}

func initMNSErrors() {
Expand Down
77 changes: 47 additions & 30 deletions example/queue_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"encoding/json"
"io/ioutil"
"fmt"
_ "net/http/pprof"
"log"
"net/http"

"github.com/souriki/ali_mns"
"github.com/gogap/logs"
Expand All @@ -16,6 +19,11 @@ type appConf struct {
}

func main() {
go func() {
log.Println(http.ListenAndServe("localhost:8080", nil))
}()


conf := appConf{}

if bFile, e := ioutil.ReadFile("app.conf"); e != nil {
Expand All @@ -37,6 +45,8 @@ func main() {


queueManager := ali_mns.NewMNSQueueManager(client)


err := queueManager.CreateSimpleQueue("test")

if err != nil && !ali_mns.ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR.IsEqual(err) {
Expand All @@ -45,42 +55,49 @@ func main() {
}

queue := ali_mns.NewMNSQueue("test", client)
ret, err := queue.SendMessage(msg)

if err != nil {
fmt.Println(err)
} else {
logs.Pretty("response:", ret)
}
for i := 1 ; i < 10000 ; i++ {
_, err := queue.SendMessage(msg)

endChan := make(chan int)
respChan := make(chan ali_mns.MessageReceiveResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
logs.Pretty("response:", resp)
logs.Debug("change the visibility: ", resp.ReceiptHandle)
if ret, e := queue.ChangeMessageVisibility(resp.ReceiptHandle, 5); e != nil {
fmt.Println(e)
} else {
logs.Pretty("visibility changed", ret)
logs.Debug("delete it now: ", ret.ReceiptHandle)
if e := queue.DeleteMessage(ret.ReceiptHandle); e != nil {
go func() {
fmt.Println(queue.QPSMonitor().QPS())
}()

if err != nil {
fmt.Println(err)
} else {
// logs.Pretty("response:", ret)
}

endChan := make(chan int)
respChan := make(chan ali_mns.MessageReceiveResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// logs.Pretty("response:", resp)
logs.Debug("change the visibility: ", resp.ReceiptHandle)
if ret, e := queue.ChangeMessageVisibility(resp.ReceiptHandle, 5); e != nil {
fmt.Println(e)
} else {
// logs.Pretty("visibility changed", ret)
logs.Debug("delete it now: ", ret.ReceiptHandle)
if e := queue.DeleteMessage(ret.ReceiptHandle); e != nil {
fmt.Println(e)
}
endChan <- 1
}
}
case err := <-errChan:
{
fmt.Println(err)
endChan <- 1
}
}
case err := <-errChan:
{
fmt.Println(err)
endChan <- 1
}
}
}()
}()

queue.ReceiveMessage(respChan, errChan, 30)
<-endChan
queue.ReceiveMessage(respChan, errChan, 30)
<-endChan
}
}
1 change: 1 addition & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type CreateQueueRequest struct {
MessageRetentionPeriod int32 `xml:"MessageRetentionPeriod,omitempty" json:"message_retention_period,omitempty"`
VisibilityTimeout int32 `xml:"VisibilityTimeout,omitempty" json:"visibility_timeout,omitempty"`
PollingWaitSeconds int32 `xml:"PollingWaitSeconds" json:"polling_wait_secods"`
Slices int32 `xml:"Slices" json:"slices"`
}

type CreateTopicRequest struct {
Expand Down
Loading

0 comments on commit 8bd3273

Please sign in to comment.