Skip to content

Commit

Permalink
frps: vhost_http_port and vhost_https_port can be same with frps bind
Browse files Browse the repository at this point in the history
port
  • Loading branch information
fatedier committed May 6, 2018
1 parent f45283d commit 5db605c
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 25 deletions.
1 change: 1 addition & 0 deletions conf/frps_full.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ kcp_bind_port = 7000
# proxy_bind_addr = 127.0.0.1

# if you want to support virtual host, you must set the http port for listening (optional)
# Note: http port and https port can be same with bind_port
vhost_http_port = 80
vhost_https_port = 443

Expand Down
56 changes: 45 additions & 11 deletions server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
frpNet "github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/net/mux"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version"
"github.com/fatedier/frp/utils/vhost"
Expand All @@ -41,6 +42,9 @@ var ServerService *Service

// Server service.
type Service struct {
// Dispatch connections to different handlers listen on same port.
muxer *mux.Mux

// Accept connections from client.
listener frpNet.Listener

Expand Down Expand Up @@ -88,12 +92,33 @@ func NewService() (svr *Service, err error) {
return
}

var (
httpMuxOn bool
httpsMuxOn bool
)
if cfg.BindAddr == cfg.ProxyBindAddr {
if cfg.BindPort == cfg.VhostHttpPort {
httpMuxOn = true
}
if cfg.BindPort == cfg.VhostHttpsPort {
httpsMuxOn = true
}
if httpMuxOn || httpsMuxOn {
svr.muxer = mux.NewMux()
}
}

// Listen for accepting connections from client.
svr.listener, err = frpNet.ListenTcp(cfg.BindAddr, cfg.BindPort)
ln, err := net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.BindAddr, cfg.BindPort))
if err != nil {
err = fmt.Errorf("Create server listener error, %v", err)
return
}
if svr.muxer != nil {
go svr.muxer.Serve(ln)
ln = svr.muxer.DefaultListener()
}
svr.listener = frpNet.WrapLogListener(ln)
log.Info("frps tcp listen on %s:%d", cfg.BindAddr, cfg.BindPort)

// Listen for accepting connections from client using kcp protocol.
Expand All @@ -117,24 +142,33 @@ func NewService() (svr *Service, err error) {
Handler: rp,
}
var l net.Listener
l, err = net.Listen("tcp", address)
if err != nil {
err = fmt.Errorf("Create vhost http listener error, %v", err)
return
if httpMuxOn {
l = svr.muxer.ListenHttp(0)
} else {
l, err = net.Listen("tcp", address)
if err != nil {
err = fmt.Errorf("Create vhost http listener error, %v", err)
return
}
}
go server.Serve(l)
log.Info("http service listen on %s:%d", cfg.ProxyBindAddr, cfg.VhostHttpPort)
}

// Create https vhost muxer.
if cfg.VhostHttpsPort > 0 {
var l frpNet.Listener
l, err = frpNet.ListenTcp(cfg.ProxyBindAddr, cfg.VhostHttpsPort)
if err != nil {
err = fmt.Errorf("Create vhost https listener error, %v", err)
return
var l net.Listener
if httpsMuxOn {
l = svr.muxer.ListenHttps(0)
} else {
l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", cfg.ProxyBindAddr, cfg.VhostHttpsPort))
if err != nil {
err = fmt.Errorf("Create server listener error, %v", err)
return
}
}
svr.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(l, 30*time.Second)

svr.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(frpNet.WrapLogListener(l), 30*time.Second)
if err != nil {
err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
return
Expand Down
18 changes: 9 additions & 9 deletions utils/net/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -136,7 +135,6 @@ func ConnectServerByProxy(proxyUrl string, protocol string, addr string) (c Conn

type SharedConn struct {
Conn
sync.Mutex
buf *bytes.Buffer
}

Expand All @@ -149,22 +147,24 @@ func NewShareConn(conn Conn) (*SharedConn, io.Reader) {
return sc, io.TeeReader(conn, sc.buf)
}

func NewShareConnSize(conn Conn, bufSize int) (*SharedConn, io.Reader) {
sc := &SharedConn{
Conn: conn,
buf: bytes.NewBuffer(make([]byte, 0, bufSize)),
}
return sc, io.TeeReader(conn, sc.buf)
}

// Not thread safety.
func (sc *SharedConn) Read(p []byte) (n int, err error) {
sc.Lock()
if sc.buf == nil {
sc.Unlock()
return sc.Conn.Read(p)
}
sc.Unlock()
n, err = sc.buf.Read(p)

if err == io.EOF {
sc.Lock()
sc.buf = nil
sc.Unlock()
var n2 int
n2, err = sc.Conn.Read(p[n:])

n += n2
}
return
Expand Down
210 changes: 210 additions & 0 deletions utils/net/mux/mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package mux

import (
"fmt"
"io"
"net"
"sort"
"sync"
"time"

"github.com/fatedier/frp/utils/errors"
frpNet "github.com/fatedier/frp/utils/net"
)

const (
// DefaultTimeout is the default length of time to wait for bytes we need.
DefaultTimeout = 10 * time.Second
)

type Mux struct {
ln net.Listener

defaultLn *listener
lns []*listener
maxNeedBytesNum uint32
mu sync.RWMutex
}

func NewMux() (mux *Mux) {
mux = &Mux{
lns: make([]*listener, 0),
}
return
}

func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener {
ln := &listener{
c: make(chan net.Conn),
mux: mux,
needBytesNum: needBytesNum,
matchFn: fn,
}

mux.mu.Lock()
defer mux.mu.Unlock()
if needBytesNum > mux.maxNeedBytesNum {
mux.maxNeedBytesNum = needBytesNum
}

newlns := append(mux.copyLns(), ln)
sort.Slice(newlns, func(i, j int) bool {
return newlns[i].needBytesNum < newlns[j].needBytesNum
})
mux.lns = newlns
return ln
}

func (mux *Mux) ListenHttp(priority int) net.Listener {
return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc)
}

func (mux *Mux) ListenHttps(priority int) net.Listener {
return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc)
}

func (mux *Mux) DefaultListener() net.Listener {
mux.mu.Lock()
defer mux.mu.Unlock()
if mux.defaultLn == nil {
mux.defaultLn = &listener{
c: make(chan net.Conn),
mux: mux,
}
}
return mux.defaultLn
}

func (mux *Mux) release(ln *listener) bool {
result := false
mux.mu.Lock()
defer mux.mu.Unlock()
lns := mux.copyLns()

for i, l := range lns {
if l == ln {
lns = append(lns[:i], lns[i+1:]...)
result = true
}
}
mux.lns = lns
return result
}

func (mux *Mux) copyLns() []*listener {
lns := make([]*listener, 0, len(mux.lns))
for _, l := range mux.lns {
lns = append(lns, l)
}
return lns
}

// Serve handles connections from ln and multiplexes then across registered listeners.
func (mux *Mux) Serve(ln net.Listener) error {
mux.mu.Lock()
mux.ln = ln
mux.mu.Unlock()
for {
// Wait for the next connection.
// If it returns a temporary error then simply retry.
// If it returns any other error then exit immediately.
conn, err := ln.Accept()
if err, ok := err.(interface {
Temporary() bool
}); ok && err.Temporary() {
continue
}

if err != nil {
return err
}

go mux.handleConn(conn)
}
}

func (mux *Mux) handleConn(conn net.Conn) {
mux.mu.RLock()
maxNeedBytesNum := mux.maxNeedBytesNum
lns := mux.lns
defaultLn := mux.defaultLn
mux.mu.RUnlock()

shareConn, rd := frpNet.NewShareConnSize(frpNet.WrapConn(conn), int(maxNeedBytesNum))
data := make([]byte, maxNeedBytesNum)

conn.SetReadDeadline(time.Now().Add(DefaultTimeout))
_, err := io.ReadFull(rd, data)
if err != nil {
conn.Close()
return
}
conn.SetReadDeadline(time.Time{})

for _, ln := range lns {
if match := ln.matchFn(data); match {
err = errors.PanicToError(func() {
ln.c <- shareConn
})
if err != nil {
conn.Close()
}
return
}
}

// No match listeners
if defaultLn != nil {
err = errors.PanicToError(func() {
defaultLn.c <- shareConn
})
if err != nil {
conn.Close()
}
return
}

// No listeners for this connection, close it.
conn.Close()
return
}

type listener struct {
mux *Mux

needBytesNum uint32
matchFn MatchFunc

c chan net.Conn
mu sync.RWMutex
}

// Accept waits for and returns the next connection to the listener.
func (ln *listener) Accept() (net.Conn, error) {
conn, ok := <-ln.c
if !ok {
return nil, fmt.Errorf("network connection closed")
}
return conn, nil
}

// Close removes this listener from the parent mux and closes the channel.
func (ln *listener) Close() error {
if ok := ln.mux.release(ln); ok {
// Close done to signal to any RLock holders to release their lock.
close(ln.c)
}
return nil
}

func (ln *listener) Addr() net.Addr {
if ln.mux == nil {
return nil
}
ln.mux.mu.RLock()
defer ln.mux.mu.RUnlock()
if ln.mux.ln == nil {
return nil
}
return ln.mux.ln.Addr()
}
Loading

0 comments on commit 5db605c

Please sign in to comment.