Skip to content

Commit

Permalink
adjust code style about config (fatedier#3464)
Browse files Browse the repository at this point in the history
  • Loading branch information
fatedier authored May 30, 2023
1 parent 341a5e3 commit 9aef3b9
Show file tree
Hide file tree
Showing 29 changed files with 262 additions and 474 deletions.
2 changes: 1 addition & 1 deletion client/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewProxyStatusResp(status *proxy.WorkingStatus, serverAddr string) ProxySta
Status: status.Phase,
Err: status.Err,
}
baseCfg := status.Cfg.GetBaseInfo()
baseCfg := status.Cfg.GetBaseConfig()
if baseCfg.LocalPort != 0 {
psr.LocalAddr = net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort))
}
Expand Down
2 changes: 1 addition & 1 deletion client/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewControl(
ctl.msgTransporter = transport.NewMessageTransporter(ctl.sendCh)
ctl.pm = proxy.NewManager(ctl.ctx, clientCfg, ctl.msgTransporter)

ctl.vm = visitor.NewManager(ctl.ctx, ctl.clientCfg, ctl.connectServer, ctl.msgTransporter)
ctl.vm = visitor.NewManager(ctl.ctx, ctl.runID, ctl.clientCfg, ctl.connectServer, ctl.msgTransporter)
ctl.vm.Reload(visitorCfgs)
return ctl
}
Expand Down
14 changes: 7 additions & 7 deletions client/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func NewProxy(
msgTransporter transport.MessageTransporter,
) (pxy Proxy) {
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeClient {
limitBytes := pxyConf.GetBaseConfig().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseConfig().BandwidthLimitMode == config.BandwidthLimitModeClient {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func (pxy *TCPProxy) Close() {
}

func (pxy *TCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseConfig(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}

Expand Down Expand Up @@ -177,7 +177,7 @@ func (pxy *TCPMuxProxy) Close() {
}

func (pxy *TCPMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseConfig(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}

Expand Down Expand Up @@ -206,7 +206,7 @@ func (pxy *HTTPProxy) Close() {
}

func (pxy *HTTPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseConfig(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}

Expand Down Expand Up @@ -235,7 +235,7 @@ func (pxy *HTTPSProxy) Close() {
}

func (pxy *HTTPSProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseConfig(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}

Expand Down Expand Up @@ -264,7 +264,7 @@ func (pxy *STCPProxy) Close() {
}

func (pxy *STCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseConfig(), pxy.limiter,
conn, []byte(pxy.clientCfg.Token), m)
}

Expand Down
10 changes: 4 additions & 6 deletions client/proxy/proxy_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"reflect"
"sync"

"github.com/fatedier/frp/client/event"
Expand Down Expand Up @@ -121,21 +122,18 @@ func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
for name, pxy := range pm.proxies {
del := false
cfg, ok := pxyCfgs[name]
if !ok {
del = true
} else if !pxy.Cfg.Compare(cfg) {
if !ok || !reflect.DeepEqual(pxy.Cfg, cfg) {
del = true
}

if del {
delPxyNames = append(delPxyNames, name)
delete(pm.proxies, name)

pxy.Stop()
}
}
if len(delPxyNames) > 0 {
xl.Info("proxy removed: %v", delPxyNames)
xl.Info("proxy removed: %s", delPxyNames)
}

addPxyNames := make([]string, 0)
Expand All @@ -149,6 +147,6 @@ func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
}
}
if len(addPxyNames) > 0 {
xl.Info("proxy added: %v", addPxyNames)
xl.Info("proxy added: %s", addPxyNames)
}
}
2 changes: 1 addition & 1 deletion client/proxy/proxy_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewWrapper(
eventHandler event.Handler,
msgTransporter transport.MessageTransporter,
) *Wrapper {
baseInfo := cfg.GetBaseInfo()
baseInfo := cfg.GetBaseConfig()
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.ProxyName)
pw := &Wrapper{
WorkingStatus: WorkingStatus{
Expand Down
4 changes: 2 additions & 2 deletions client/proxy/xtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, s
xl.Error("accept connection error: %v", err)
return
}
go HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
go HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseConfig(), pxy.limiter,
muxConn, []byte(pxy.cfg.Sk), startWorkConnMsg)
}
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (pxy *XTCPProxy) listenByQUIC(listenConn *net.UDPConn, _ *net.UDPAddr, star
_ = c.CloseWithError(0, "")
return
}
go HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
go HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseConfig(), pxy.limiter,
utilnet.QuicStreamToNetConn(stream, c), []byte(pxy.cfg.Sk), startWorkConnMsg)
}
}
3 changes: 2 additions & 1 deletion client/visitor/stcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
defer userConn.Close()

xl.Debug("get a new stcp user connection")
visitorConn, err := sv.connectServer()
visitorConn, err := sv.helper.ConnectServer()
if err != nil {
return
}
defer visitorConn.Close()

now := time.Now().Unix()
newVisitorConnMsg := &msg.NewVisitorConn{
RunID: sv.helper.RunID(),
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
Expand Down
3 changes: 2 additions & 1 deletion client/visitor/sudp.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,14 @@ func (sv *SUDPVisitor) worker(workConn net.Conn, firstPacket *msg.UDPPacket) {

func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
xl := xlog.FromContextSafe(sv.ctx)
visitorConn, err := sv.connectServer()
visitorConn, err := sv.helper.ConnectServer()
if err != nil {
return nil, fmt.Errorf("frpc connect frps error: %v", err)
}

now := time.Now().Unix()
newVisitorConnMsg := &msg.NewVisitorConn{
RunID: sv.helper.RunID(),
ProxyName: sv.cfg.ServerName,
SignKey: util.GetAuthKey(sv.cfg.Sk, now),
Timestamp: now,
Expand Down
39 changes: 23 additions & 16 deletions client/visitor/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ import (
"github.com/fatedier/frp/pkg/util/xlog"
)

// Helper wrapps some functions for visitor to use.
type Helper interface {
// ConnectServer directly connects to the frp server.
ConnectServer() (net.Conn, error)
// TransferConn transfers the connection to another visitor.
TransferConn(string, net.Conn) error
// MsgTransporter returns the message transporter that is used to send and receive messages
// to the frp server through the controller.
MsgTransporter() transport.MessageTransporter
// RunID returns the run id of current controller.
RunID() string
}

// Visitor is used for forward traffics from local port tot remote service.
type Visitor interface {
Run() error
Expand All @@ -36,18 +49,14 @@ func NewVisitor(
ctx context.Context,
cfg config.VisitorConf,
clientCfg config.ClientCommonConf,
connectServer func() (net.Conn, error),
transferConn func(string, net.Conn) error,
msgTransporter transport.MessageTransporter,
helper Helper,
) (visitor Visitor) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseInfo().ProxyName)
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(cfg.GetBaseConfig().ProxyName)
baseVisitor := BaseVisitor{
clientCfg: clientCfg,
connectServer: connectServer,
transferConn: transferConn,
msgTransporter: msgTransporter,
ctx: xlog.NewContext(ctx, xl),
internalLn: utilnet.NewInternalListener(),
clientCfg: clientCfg,
helper: helper,
ctx: xlog.NewContext(ctx, xl),
internalLn: utilnet.NewInternalListener(),
}
switch cfg := cfg.(type) {
case *config.STCPVisitorConf:
Expand All @@ -72,12 +81,10 @@ func NewVisitor(
}

type BaseVisitor struct {
clientCfg config.ClientCommonConf
connectServer func() (net.Conn, error)
transferConn func(string, net.Conn) error
msgTransporter transport.MessageTransporter
l net.Listener
internalLn *utilnet.InternalListener
clientCfg config.ClientCommonConf
helper Helper
l net.Listener
internalLn *utilnet.InternalListener

mu sync.RWMutex
ctx context.Context
Expand Down
67 changes: 47 additions & 20 deletions client/visitor/visitor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"net"
"reflect"
"sync"
"time"

Expand All @@ -27,11 +28,10 @@ import (
)

type Manager struct {
clientCfg config.ClientCommonConf
connectServer func() (net.Conn, error)
msgTransporter transport.MessageTransporter
cfgs map[string]config.VisitorConf
visitors map[string]Visitor
clientCfg config.ClientCommonConf
cfgs map[string]config.VisitorConf
visitors map[string]Visitor
helper Helper

checkInterval time.Duration

Expand All @@ -43,20 +43,26 @@ type Manager struct {

func NewManager(
ctx context.Context,
runID string,
clientCfg config.ClientCommonConf,
connectServer func() (net.Conn, error),
msgTransporter transport.MessageTransporter,
) *Manager {
return &Manager{
clientCfg: clientCfg,
connectServer: connectServer,
msgTransporter: msgTransporter,
cfgs: make(map[string]config.VisitorConf),
visitors: make(map[string]Visitor),
checkInterval: 10 * time.Second,
ctx: ctx,
stopCh: make(chan struct{}),
m := &Manager{
clientCfg: clientCfg,
cfgs: make(map[string]config.VisitorConf),
visitors: make(map[string]Visitor),
checkInterval: 10 * time.Second,
ctx: ctx,
stopCh: make(chan struct{}),
}
m.helper = &visitorHelperImpl{
connectServerFn: connectServer,
msgTransporter: msgTransporter,
transferConnFn: m.TransferConn,
runID: runID,
}
return m
}

func (vm *Manager) Run() {
Expand All @@ -73,7 +79,7 @@ func (vm *Manager) Run() {
case <-ticker.C:
vm.mu.Lock()
for _, cfg := range vm.cfgs {
name := cfg.GetBaseInfo().ProxyName
name := cfg.GetBaseConfig().ProxyName
if _, exist := vm.visitors[name]; !exist {
xl.Info("try to start visitor [%s]", name)
_ = vm.startVisitor(cfg)
Expand All @@ -100,8 +106,8 @@ func (vm *Manager) Close() {
// Hold lock before calling this function.
func (vm *Manager) startVisitor(cfg config.VisitorConf) (err error) {
xl := xlog.FromContextSafe(vm.ctx)
name := cfg.GetBaseInfo().ProxyName
visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.connectServer, vm.TransferConn, vm.msgTransporter)
name := cfg.GetBaseConfig().ProxyName
visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper)
err = visitor.Run()
if err != nil {
xl.Warn("start error: %v", err)
Expand All @@ -121,9 +127,7 @@ func (vm *Manager) Reload(cfgs map[string]config.VisitorConf) {
for name, oldCfg := range vm.cfgs {
del := false
cfg, ok := cfgs[name]
if !ok {
del = true
} else if !oldCfg.Compare(cfg) {
if !ok || !reflect.DeepEqual(oldCfg, cfg) {
del = true
}

Expand Down Expand Up @@ -163,3 +167,26 @@ func (vm *Manager) TransferConn(name string, conn net.Conn) error {
}
return v.AcceptConn(conn)
}

type visitorHelperImpl struct {
connectServerFn func() (net.Conn, error)
msgTransporter transport.MessageTransporter
transferConnFn func(name string, conn net.Conn) error
runID string
}

func (v *visitorHelperImpl) ConnectServer() (net.Conn, error) {
return v.connectServerFn()
}

func (v *visitorHelperImpl) TransferConn(name string, conn net.Conn) error {
return v.transferConnFn(name, conn)
}

func (v *visitorHelperImpl) MsgTransporter() transport.MessageTransporter {
return v.msgTransporter
}

func (v *visitorHelperImpl) RunID() string {
return v.runID
}
6 changes: 3 additions & 3 deletions client/visitor/xtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
}

xl.Debug("try to transfer connection to visitor: %s", sv.cfg.FallbackTo)
if err := sv.transferConn(sv.cfg.FallbackTo, userConn); err != nil {
if err := sv.helper.TransferConn(sv.cfg.FallbackTo, userConn); err != nil {
xl.Error("transfer connection to visitor %s error: %v", sv.cfg.FallbackTo, err)
return
}
Expand Down Expand Up @@ -266,7 +266,7 @@ func (sv *XTCPVisitor) getTunnelConn() (net.Conn, error) {
// 4. Create a tunnel session using an underlying UDP connection.
func (sv *XTCPVisitor) makeNatHole() {
xl := xlog.FromContextSafe(sv.ctx)
if err := nathole.PreCheck(sv.ctx, sv.msgTransporter, sv.cfg.ServerName, 5*time.Second); err != nil {
if err := nathole.PreCheck(sv.ctx, sv.helper.MsgTransporter(), sv.cfg.ServerName, 5*time.Second); err != nil {
xl.Warn("nathole precheck error: %v", err)
return
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (sv *XTCPVisitor) makeNatHole() {
AssistedAddrs: prepareResult.AssistedAddrs,
}

natHoleRespMsg, err := nathole.ExchangeInfo(sv.ctx, sv.msgTransporter, transactionID, natHoleVisitorMsg, 5*time.Second)
natHoleRespMsg, err := nathole.ExchangeInfo(sv.ctx, sv.helper.MsgTransporter(), transactionID, natHoleVisitorMsg, 5*time.Second)
if err != nil {
listenConn.Close()
xl.Warn("nathole exchange info error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/frpc/sub/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var httpCmd = &cobra.Command{
}
cfg.BandwidthLimitMode = bandwidthLimitMode

err = cfg.CheckForCli()
err = cfg.ValidateForClient()
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion cmd/frpc/sub/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var httpsCmd = &cobra.Command{
}
cfg.BandwidthLimitMode = bandwidthLimitMode

err = cfg.CheckForCli()
err = cfg.ValidateForClient()
if err != nil {
fmt.Println(err)
os.Exit(1)
Expand Down
Loading

0 comments on commit 9aef3b9

Please sign in to comment.