Skip to content

Commit

Permalink
Merge pull request fatedier#886 from fatedier/websocket
Browse files Browse the repository at this point in the history
Connect protocol support websocket
  • Loading branch information
fatedier authored Aug 10, 2018
2 parents ca88b07 + 0762302 commit f14ed87
Show file tree
Hide file tree
Showing 12 changed files with 1,480 additions and 57 deletions.
7 changes: 4 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion conf/frpc_full.ini
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ user = your_name
login_fail_exit = true

# communication protocol used to connect to server
# now it supports tcp and kcp, default is tcp
# now it supports tcp and kcp and websocket, default is tcp
protocol = tcp

# specify a dns server, so frpc will use this instead of default one
Expand Down
7 changes: 4 additions & 3 deletions models/config/client_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,10 @@ func UnmarshalClientConfFromIni(defaultCfg *ClientCommonConf, content string) (c
}

if tmpStr, ok = conf.Get("common", "protocol"); ok {
// Now it only support tcp and kcp.
if tmpStr != "kcp" {
tmpStr = "tcp"
// Now it only support tcp and kcp and websocket.
if tmpStr != "tcp" && tmpStr != "kcp" && tmpStr != "websocket" {
err = fmt.Errorf("Parse conf error: invalid protocol")
return
}
cfg.Protocol = tmpStr
}
Expand Down
32 changes: 22 additions & 10 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"bytes"
"fmt"
"io/ioutil"
"net"
Expand Down Expand Up @@ -53,6 +54,9 @@ type Service struct {
// Accept connections using kcp
kcpListener frpNet.Listener

// Accept connections using websocket
websocketListener frpNet.Listener

// For https proxies, route requests to different clients by hostname and other infomation
VhostHttpsMuxer *vhost.HttpsMuxer

Expand Down Expand Up @@ -109,9 +113,6 @@ func NewService() (svr *Service, err error) {
if cfg.BindPort == cfg.VhostHttpsPort {
httpsMuxOn = true
}
if httpMuxOn || httpsMuxOn {
svr.muxer = mux.NewMux()
}
}

// Listen for accepting connections from client.
Expand All @@ -120,10 +121,11 @@ func NewService() (svr *Service, err error) {
err = fmt.Errorf("Create server listener error, %v", err)
return
}
if svr.muxer != nil {
go svr.muxer.Serve(ln)
ln = svr.muxer.DefaultListener()
}

svr.muxer = mux.NewMux(ln)
go svr.muxer.Serve()
ln = svr.muxer.DefaultListener()

svr.listener = frpNet.WrapLogListener(ln)
log.Info("frps tcp listen on %s:%d", cfg.BindAddr, cfg.BindPort)

Expand All @@ -137,6 +139,13 @@ func NewService() (svr *Service, err error) {
log.Info("frps kcp listen on udp %s:%d", cfg.BindAddr, cfg.KcpBindPort)
}

// Listen for accepting connections from client using websocket protocol.
websocketPrefix := []byte("GET " + frpNet.FrpWebsocketPath)
websocketLn := svr.muxer.Listen(0, uint32(len(websocketPrefix)), func(data []byte) bool {
return bytes.Equal(data, websocketPrefix)
})
svr.websocketListener = frpNet.NewWebsocketListener(websocketLn)

// Create http vhost muxer.
if cfg.VhostHttpPort > 0 {
rp := vhost.NewHttpReverseProxy(vhost.HttpReverseProxyOptions{
Expand All @@ -151,7 +160,7 @@ func NewService() (svr *Service, err error) {
}
var l net.Listener
if httpMuxOn {
l = svr.muxer.ListenHttp(0)
l = svr.muxer.ListenHttp(1)
} else {
l, err = net.Listen("tcp", address)
if err != nil {
Expand All @@ -167,7 +176,7 @@ func NewService() (svr *Service, err error) {
if cfg.VhostHttpsPort > 0 {
var l net.Listener
if httpsMuxOn {
l = svr.muxer.ListenHttps(0)
l = svr.muxer.ListenHttps(1)
} else {
l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort))
if err != nil {
Expand Down Expand Up @@ -206,6 +215,7 @@ func NewService() (svr *Service, err error) {
}
log.Info("Dashboard listen on %s:%d", cfg.DashboardAddr, cfg.DashboardPort)
}

return
}

Expand All @@ -216,8 +226,10 @@ func (svr *Service) Run() {
if g.GlbServerCfg.KcpBindPort > 0 {
go svr.HandleListener(svr.kcpListener)
}
svr.HandleListener(svr.listener)

go svr.HandleListener(svr.websocketListener)

svr.HandleListener(svr.listener)
}

func (svr *Service) HandleListener(l frpNet.Listener) {
Expand Down
100 changes: 66 additions & 34 deletions utils/net/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,45 +96,34 @@ func (conn *WrapReadWriteCloserConn) SetWriteDeadline(t time.Time) error {
return &net.OpError{Op: "set", Net: "wrap", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
}

func ConnectServer(protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
return ConnectTcpServer(addr)
case "kcp":
kcpConn, errRet := kcp.DialWithOptions(addr, nil, 10, 3)
if errRet != nil {
err = errRet
return
}
kcpConn.SetStreamMode(true)
kcpConn.SetWriteDelay(true)
kcpConn.SetNoDelay(1, 20, 2, 1)
kcpConn.SetWindowSize(128, 512)
kcpConn.SetMtu(1350)
kcpConn.SetACKNoDelay(false)
kcpConn.SetReadBuffer(4194304)
kcpConn.SetWriteBuffer(4194304)
c = WrapConn(kcpConn)
return
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
type CloseNotifyConn struct {
net.Conn
log.Logger

// 1 means closed
closeFlag int32

closeFn func()
}

// closeFn will be only called once
func WrapCloseNotifyConn(c net.Conn, closeFn func()) Conn {
return &CloseNotifyConn{
Conn: c,
Logger: log.NewPrefixLogger(""),
closeFn: closeFn,
}
}

func ConnectServerByProxy(proxyUrl string, protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
var conn net.Conn
if conn, err = gnet.DialTcpByProxy(proxyUrl, addr); err != nil {
return
func (cc *CloseNotifyConn) Close() (err error) {
pflag := atomic.SwapInt32(&cc.closeFlag, 1)
if pflag == 0 {
err = cc.Close()
if cc.closeFn != nil {
cc.closeFn()
}
return WrapConn(conn), nil
case "kcp":
// http proxy is not supported for kcp
return ConnectServer(protocol, addr)
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
return
}

type StatsConn struct {
Expand Down Expand Up @@ -175,3 +164,46 @@ func (statsConn *StatsConn) Close() (err error) {
}
return
}

func ConnectServer(protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
return ConnectTcpServer(addr)
case "kcp":
kcpConn, errRet := kcp.DialWithOptions(addr, nil, 10, 3)
if errRet != nil {
err = errRet
return
}
kcpConn.SetStreamMode(true)
kcpConn.SetWriteDelay(true)
kcpConn.SetNoDelay(1, 20, 2, 1)
kcpConn.SetWindowSize(128, 512)
kcpConn.SetMtu(1350)
kcpConn.SetACKNoDelay(false)
kcpConn.SetReadBuffer(4194304)
kcpConn.SetWriteBuffer(4194304)
c = WrapConn(kcpConn)
return
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
}

func ConnectServerByProxy(proxyUrl string, protocol string, addr string) (c Conn, err error) {
switch protocol {
case "tcp":
var conn net.Conn
if conn, err = gnet.DialTcpByProxy(proxyUrl, addr); err != nil {
return
}
return WrapConn(conn), nil
case "kcp":
// http proxy is not supported for kcp
return ConnectServer(protocol, addr)
case "websocket":
return ConnectWebsocketServer(addr)
default:
return nil, fmt.Errorf("unsupport protocol: %s", protocol)
}
}
105 changes: 105 additions & 0 deletions utils/net/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package net

import (
"errors"
"fmt"
"net"
"net/http"
"net/url"
"time"

"github.com/fatedier/frp/utils/log"

"golang.org/x/net/websocket"
)

var (
ErrWebsocketListenerClosed = errors.New("websocket listener closed")
)

const (
FrpWebsocketPath = "/~!frp"
)

type WebsocketListener struct {
net.Addr
ln net.Listener
accept chan Conn
log.Logger

server *http.Server
httpMutex *http.ServeMux
}

// ln: tcp listener for websocket connections
func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) {
wl = &WebsocketListener{
Addr: ln.Addr(),
accept: make(chan Conn),
Logger: log.NewPrefixLogger(""),
}

muxer := http.NewServeMux()
muxer.Handle(FrpWebsocketPath, websocket.Handler(func(c *websocket.Conn) {
notifyCh := make(chan struct{})
conn := WrapCloseNotifyConn(c, func() {
close(notifyCh)
})
wl.accept <- conn
<-notifyCh
}))

wl.server = &http.Server{
Addr: ln.Addr().String(),
Handler: muxer,
}

go wl.server.Serve(ln)
return
}

func ListenWebsocket(bindAddr string, bindPort int) (*WebsocketListener, error) {
tcpLn, err := net.Listen("tcp", fmt.Sprintf("%s:%d", bindAddr, bindPort))
if err != nil {
return nil, err
}
l := NewWebsocketListener(tcpLn)
return l, nil
}

func (p *WebsocketListener) Accept() (Conn, error) {
c, ok := <-p.accept
if !ok {
return nil, ErrWebsocketListenerClosed
}
return c, nil
}

func (p *WebsocketListener) Close() error {
return p.server.Close()
}

// addr: domain:port
func ConnectWebsocketServer(addr string) (Conn, error) {
addr = "ws://" + addr + FrpWebsocketPath
uri, err := url.Parse(addr)
if err != nil {
return nil, err
}

origin := "http://" + uri.Host
cfg, err := websocket.NewConfig(addr, origin)
if err != nil {
return nil, err
}
cfg.Dialer = &net.Dialer{
Timeout: 10 * time.Second,
}

conn, err := websocket.DialConfig(cfg)
if err != nil {
return nil, err
}
c := WrapConn(conn)
return c, nil
}
10 changes: 4 additions & 6 deletions vendor/github.com/fatedier/golib/net/mux/mux.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f14ed87

Please sign in to comment.