Skip to content

Commit

Permalink
移除httpmeta系统, 简化http操作
Browse files Browse the repository at this point in the history
修复tcp session竞态问题
  • Loading branch information
davyxu committed May 29, 2019
1 parent ee5e45a commit ffe524b
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 307 deletions.
134 changes: 0 additions & 134 deletions metahttp.go

This file was deleted.

13 changes: 0 additions & 13 deletions peer/http/acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"html/template"
"net"
"net/http"
"reflect"
"strings"
"time"
)
Expand Down Expand Up @@ -117,18 +116,6 @@ func (self *httpAcceptor) ServeHTTP(res http.ResponseWriter, req *http.Request)
var err error
var fileHandled bool

// 请求转消息,文件处理
meta := cellnet.HttpMetaByMethodURL(req.Method, req.URL.Path)
if meta != nil {

// 直接打开页面时,无需创建消息
if meta.RequestType != nil {
msg = reflect.New(meta.RequestType).Interface()

err = meta.RequestCodec.Decode(req, msg)
}
}

if err != nil {
goto OnError
}
Expand Down
73 changes: 44 additions & 29 deletions peer/http/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"fmt"
"github.com/davyxu/cellnet"
"github.com/davyxu/cellnet/codec"
"github.com/davyxu/cellnet/peer"
"io"
"net/http"
Expand All @@ -24,58 +25,72 @@ func (self *httpConnector) Stop() {

}

func (self *httpConnector) Request(method, path string, raw interface{}) (interface{}, error) {
func getCodec(codecName string) cellnet.Codec {

// 获取消息元信息
meta := cellnet.HttpMetaByMethodURL(method, path)
if meta == nil {
return nil, cellnet.NewErrorContext("msg not found", raw)
if codecName == "" {
codecName = "httpjson"
}

// 将消息编码为字节数组
data, err := meta.RequestCodec.Encode(raw, nil)
return codec.MustGetCodec(codecName)
}

log.Debugf("#http.send(%s) '%s' %s | Message(%s) %s",
self.Name(),
meta.Method,
meta.Path,
meta.RequestTypeName(),
cellnet.MessageToString(raw))
func getTypeName(msg interface{}) string {
if msg == nil {
return ""
}

url := fmt.Sprintf("http://%s%s", self.Address(), meta.Path)
return reflect.TypeOf(msg).Elem().Name()
}

req, err := http.NewRequest(meta.Method, url, data.(io.Reader))
func (self *httpConnector) Request(method, path string, param *cellnet.HTTPRequest) error {

// 将消息编码为字节数组
reqCodec := getCodec(param.REQCodecName)
data, err := reqCodec.Encode(param.REQMsg, nil)

if log.IsDebugEnabled() {
log.Debugf("#http.send(%s) '%s' %s | Message(%s) %s",
self.Name(),
method,
path,
getTypeName(param.REQMsg),
cellnet.MessageToString(param.REQMsg))
}

url := fmt.Sprintf("http://%s%s", self.Address(), path)

req, err := http.NewRequest(method, url, data.(io.Reader))

if err != nil {
return nil, err
return err
}

mimeType := meta.RequestCodec.(interface {
mimeType := reqCodec.(interface {
MimeType() string
}).MimeType()

req.Header.Set("Content-Type", mimeType)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
return err
}

defer resp.Body.Close()

msg := reflect.New(meta.ResponseType).Interface()
err = getCodec(param.ACKCodecName).Decode(resp.Body, param.ACKMsg)

err = meta.ResponseCodec.Decode(resp.Body, msg)

log.Debugf("#http.recv(%s) '%s' %s | [%d] Message(%s) %s",
self.Name(),
resp.Request.Method,
meta.Path,
resp.StatusCode,
meta.ResponseTypeName(),
cellnet.MessageToString(msg))
if log.IsDebugEnabled() {
log.Debugf("#http.recv(%s) '%s' %s | [%d] Message(%s) %s",
self.Name(),
resp.Request.Method,
path,
resp.StatusCode,
getTypeName(param.ACKMsg),
cellnet.MessageToString(param.ACKMsg))
}

return msg, err
return err
}

func (self *httpConnector) TypeName() string {
Expand Down
11 changes: 7 additions & 4 deletions peer/tcp/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ func (self *tcpConnector) SetReconnectDuration(v time.Duration) {
}

func (self *tcpConnector) Port() int {
if self.defaultSes.conn == nil {

conn := self.defaultSes.Conn()

if conn == nil {
return 0
}

return self.defaultSes.conn.LocalAddr().(*net.TCPAddr).Port
return conn.LocalAddr().(*net.TCPAddr).Port
}

const reportConnectFailedLimitTimes = 3
Expand All @@ -96,7 +99,7 @@ func (self *tcpConnector) connect(address string) {
// 尝试用Socket连接地址
conn, err := net.Dial("tcp", address)

self.defaultSes.conn = conn
self.defaultSes.setConn(conn)

// 发生错误时退出
if err != nil {
Expand Down Expand Up @@ -138,7 +141,7 @@ func (self *tcpConnector) connect(address string) {

self.sesEndSignal.Wait()

self.defaultSes.conn = nil
self.defaultSes.setConn(nil)

// 没重连就退出/主动退出
if self.IsStopping() || self.ReconnectDuration() == 0 {
Expand Down
36 changes: 27 additions & 9 deletions peer/tcp/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ type tcpSession struct {
pInterface cellnet.Peer

// Socket原始连接
conn net.Conn
conn net.Conn
connGuard sync.RWMutex

// 退出同步器
exitSync sync.WaitGroup
Expand All @@ -34,13 +35,25 @@ type tcpSession struct {
closing int64
}

func (self *tcpSession) setConn(conn net.Conn) {
self.connGuard.Lock()
self.conn = conn
self.connGuard.Unlock()
}

func (self *tcpSession) Conn() net.Conn {
self.connGuard.RLock()
defer self.connGuard.RUnlock()
return self.conn
}

func (self *tcpSession) Peer() cellnet.Peer {
return self.pInterface
}

// 取原始连接
func (self *tcpSession) Raw() interface{} {
return self.conn
return self.Conn()
}

func (self *tcpSession) Close() {
Expand All @@ -50,13 +63,15 @@ func (self *tcpSession) Close() {
return
}

if self.conn != nil {
conn := self.Conn()

if conn != nil {
// 关闭读
con := self.conn.(*net.TCPConn)
tcpConn := conn.(*net.TCPConn)
// 关闭读
con.CloseRead()
tcpConn.CloseRead()
// 手动读超时
con.SetReadDeadline(time.Now())
tcpConn.SetReadDeadline(time.Now())
}
}

Expand Down Expand Up @@ -86,7 +101,7 @@ func (self *tcpSession) protectedReadMessage() (msg interface{}, err error) {

if err := recover(); err != nil {
log.Errorf("IO panic: %s", err)
self.conn.Close()
self.Conn().Close()
}

}()
Expand All @@ -105,7 +120,7 @@ func (self *tcpSession) recvLoop() {
capturePanic = i.CaptureIOPanic()
}

for self.conn != nil {
for self.Conn() != nil {

var msg interface{}
var err error
Expand Down Expand Up @@ -161,7 +176,10 @@ func (self *tcpSession) sendLoop() {
}

// 完整关闭
self.conn.Close()
conn := self.Conn()
if conn != nil {
conn.Close()
}

// 通知完成
self.exitSync.Done()
Expand Down
Loading

0 comments on commit ffe524b

Please sign in to comment.