Skip to content

Commit

Permalink
websocket: update muxer for websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
fatedier committed Aug 10, 2018
1 parent 64136a3 commit 7793f55
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 152 deletions.
7 changes: 4 additions & 3 deletions models/config/client_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ func UnmarshalClientConfFromIni(defaultCfg *ClientCommonConf, content string) (c
}

if tmpStr, ok = conf.Get("common", "protocol"); ok {
// Now it only support tcp and kcp.
if tmpStr != "kcp" && tmpStr != "websocket" {
tmpStr = "tcp"
// Now it only support tcp and kcp and websocket.
if tmpStr != "tcp" && tmpStr != "kcp" && tmpStr != "websocket" {
err = fmt.Errorf("Parse conf error: invalid protocol")
return
}
cfg.Protocol = tmpStr
}
Expand Down
53 changes: 16 additions & 37 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package server

import (
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"time"

"github.com/fatedier/frp/assets"
Expand Down Expand Up @@ -139,6 +139,13 @@ func NewService() (svr *Service, err error) {
log.Info("frps kcp listen on udp %s:%d", cfg.BindAddr, cfg.KcpBindPort)
}

// Listen for accepting connections from client using websocket protocol.
websocketPrefix := []byte("GET /%23frp")
websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
return bytes.Equal(data, websocketPrefix)
})
svr.websocketListener = frpNet.NewWebsocketListener(websocketLn)

// Create http vhost muxer.
if cfg.VhostHttpPort > 0 {
rp := vhost.NewHttpReverseProxy()
Expand All @@ -150,7 +157,9 @@ func NewService() (svr *Service, err error) {
Handler: rp,
}
var l net.Listener
if !httpMuxOn {
if httpMuxOn {
l = svr.muxer.ListenHttp(1)
} else {
l, err = net.Listen("tcp", address)
if err != nil {
err = fmt.Errorf("Create vhost http listener error, %v", err)
Expand All @@ -165,7 +174,7 @@ func NewService() (svr *Service, err error) {
if cfg.VhostHttpsPort > 0 {
var l net.Listener
if httpsMuxOn {
l = svr.muxer.ListenHttps(0)
l = svr.muxer.ListenHttps(1)
} else {
l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort))
if err != nil {
Expand Down Expand Up @@ -205,37 +214,6 @@ func NewService() (svr *Service, err error) {
log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
}

if !httpMuxOn {
svr.websocketListener, err = frpNet.NewWebsocketListener(svr.muxer.ListenHttp(0), nil)
return
}

// server := &http.Server{}
if httpMuxOn {
rp := svr.httpReverseProxy
svr.websocketListener, err = frpNet.NewWebsocketListener(svr.muxer.ListenHttp(0),
func(w http.ResponseWriter, req *http.Request) bool {
domain := getHostFromAddr(req.Host)
location := req.URL.Path
headers := rp.GetHeaders(domain, location)
if headers == nil {
return true
}
rp.ServeHTTP(w, req)
return false
})
}

return
}

func getHostFromAddr(addr string) (host string) {
strs := strings.Split(addr, ":")
if len(strs) > 1 {
host = strs[0]
} else {
host = addr
}
return
}

Expand All @@ -246,9 +224,9 @@ func (svr *Service) Run() {
if g.GlbServerCfg.KcpBindPort > 0 {
go svr.HandleListener(svr.kcpListener)
}
if svr.websocketListener != nil {
go svr.HandleListener(svr.websocketListener)
}

go svr.HandleListener(svr.websocketListener)

svr.HandleListener(svr.listener)
}

Expand All @@ -260,6 +238,7 @@ func (svr *Service) HandleListener(l frpNet.Listener) {
log.Warn("Listener for incoming connections from client closed")
return
}

// Start a new goroutine for dealing connections.
go func(frpConn frpNet.Conn) {
dealFn := func(conn frpNet.Conn) {
Expand Down
102 changes: 66 additions & 36 deletions utils/net/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,47 +96,34 @@ func (conn *WrapReadWriteCloserConn) SetWriteDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "wrap", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}

func ConnectServer(protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
return ConnectTcpServer(addr)
case "kcp":
kcpConn, errRet := kcp.DialWithOptions(addr, nil, 10, 3)
if errRet != nil {
err = errRet
return
}
kcpConn.SetStreamMode(true)
kcpConn.SetWriteDelay(true)
kcpConn.SetNoDelay(1, 20, 2, 1)
kcpConn.SetWindowSize(128, 512)
kcpConn.SetMtu(1350)
kcpConn.SetACKNoDelay(false)
kcpConn.SetReadBuffer(4194304)
kcpConn.SetWriteBuffer(4194304)
c = WrapConn(kcpConn)
return
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
type CloseNotifyConn struct {
net.Conn
log.Logger

// 1 means closed
closeFlag int32

closeFn func()
}

// closeFn will be only called once
func WrapCloseNotifyConn(c net.Conn, closeFn func()) Conn {
return &CloseNotifyConn{
Conn: c,
Logger: log.NewPrefixLogger(""),
closeFn: closeFn,
}
}

func ConnectServerByProxy(proxyUrl string, protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
var conn net.Conn
if conn, err = gnet.DialTcpByProxy(proxyUrl, addr); err != nil {
return
func (cc *CloseNotifyConn) Close() (err error) {
pflag := atomic.SwapInt32(&cc.closeFlag, 1)
if pflag == 0 {
err = cc.Close()
if cc.closeFn != nil {
cc.closeFn()
}
return WrapConn(conn), nil
case "kcp":
// http proxy is not supported for kcp
return ConnectServer(protocol, addr)
case "websocket":
return ConnectWebsocketServer(addr)
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
return
}

type StatsConn struct {
Expand Down Expand Up @@ -177,3 +164,46 @@ func (statsConn *StatsConn) Close() (err error) {
}
return
}

func ConnectServer(protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
return ConnectTcpServer(addr)
case "kcp":
kcpConn, errRet := kcp.DialWithOptions(addr, nil, 10, 3)
if errRet != nil {
err = errRet
return
}
kcpConn.SetStreamMode(true)
kcpConn.SetWriteDelay(true)
kcpConn.SetNoDelay(1, 20, 2, 1)
kcpConn.SetWindowSize(128, 512)
kcpConn.SetMtu(1350)
kcpConn.SetACKNoDelay(false)
kcpConn.SetReadBuffer(4194304)
kcpConn.SetWriteBuffer(4194304)
c = WrapConn(kcpConn)
return
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
}

func ConnectServerByProxy(proxyUrl string, protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
var conn net.Conn
if conn, err = gnet.DialTcpByProxy(proxyUrl, addr); err != nil {
return
}
return WrapConn(conn), nil
case "kcp":
// http proxy is not supported for kcp
return ConnectServer(protocol, addr)
case "websocket":
return ConnectWebsocketServer(addr)
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
}
Loading

0 comments on commit 7793f55

Please sign in to comment.