Skip to content

Commit

Permalink
all: fix bug when client shutdown and reconnect, server response alre…
Browse files Browse the repository at this point in the history
…ady use

1. if client is offline, server will release all resources
2. use a graceful method to shutdown go net.Listeners
3. add closeFlag for Conn, so startHeartBeat func can exit correctly now
  • Loading branch information
fatedier committed Feb 19, 2016
1 parent 0f72713 commit 26479cf
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 146 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
go_import_path: github.com/fatedier/frp
sudo: false
language: go

go:
- 1.4.2
- 1.5.2
- tip

install:
- make
Expand Down
96 changes: 44 additions & 52 deletions cmd/frpc/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"fmt"
"io"
"sync"
"time"
Expand All @@ -18,8 +19,8 @@ var isHeartBeatContinue bool = true
func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
defer wait.Done()

c := loginToServer(cli)
if c == nil {
c, err := loginToServer(cli)
if err != nil {
log.Error("ProxyName [%s], connect to server failed!", cli.Name)
return
}
Expand All @@ -34,15 +35,15 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
var sleepTime time.Duration = 1
for {
log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort)
tmpConn := loginToServer(cli)
if tmpConn != nil {
tmpConn, err := loginToServer(cli)
if err == nil {
c.Close()
c = tmpConn
break
}

if sleepTime < 60 {
sleepTime++
sleepTime = sleepTime * 2
}
time.Sleep(sleepTime * time.Second)
}
Expand All @@ -56,71 +57,62 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
}
}

func loginToServer(cli *client.ProxyClient) (connection *conn.Conn) {
c := &conn.Conn{}

connection = nil
for i := 0; i < 1; i++ {
err := c.ConnectServer(client.ServerAddr, client.ServerPort)
if err != nil {
log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, client.ServerAddr, client.ServerPort, err)
break
}

req := &msg.ClientCtlReq{
Type: consts.CtlConn,
ProxyName: cli.Name,
Passwd: cli.Passwd,
}
buf, _ := json.Marshal(req)
err = c.Write(string(buf) + "\n")
if err != nil {
log.Error("ProxyName [%s], write to server error, %v", cli.Name, err)
break
}

res, err := c.ReadLine()
if err != nil {
log.Error("ProxyName [%s], read from server error, %v", cli.Name, err)
break
}
log.Debug("ProxyName [%s], read [%s]", cli.Name, res)
func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) {
c, err = conn.ConnectServer(client.ServerAddr, client.ServerPort)
if err != nil {
log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, client.ServerAddr, client.ServerPort, err)
return
}

clientCtlRes := &msg.ClientCtlRes{}
if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil {
log.Error("ProxyName [%s], format server response error, %v", cli.Name, err)
break
}
req := &msg.ClientCtlReq{
Type: consts.CtlConn,
ProxyName: cli.Name,
Passwd: cli.Passwd,
}
buf, _ := json.Marshal(req)
err = c.Write(string(buf) + "\n")
if err != nil {
log.Error("ProxyName [%s], write to server error, %v", cli.Name, err)
return
}

if clientCtlRes.Code != 0 {
log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg)
break
}
res, err := c.ReadLine()
if err != nil {
log.Error("ProxyName [%s], read from server error, %v", cli.Name, err)
return
}
log.Debug("ProxyName [%s], read [%s]", cli.Name, res)

connection = c
go startHeartBeat(connection)
log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort)
clientCtlRes := &msg.ClientCtlRes{}
if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil {
log.Error("ProxyName [%s], format server response error, %v", cli.Name, err)
return
}

if connection == nil {
c.Close()
if clientCtlRes.Code != 0 {
log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg)
return c, fmt.Errorf("%s", clientCtlRes.Msg)
}

go startHeartBeat(c)
log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort)

return
}

func startHeartBeat(con *conn.Conn) {
isHeartBeatContinue = true
func startHeartBeat(c *conn.Conn) {
log.Debug("Start to send heartbeat")
for {
time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second)
if isHeartBeatContinue {
err := con.Write("\n")
if !c.IsClosed() {
err := c.Write("\n")
if err != nil {
log.Error("Send hearbeat to server failed! Err:%s", err.Error())
continue
}
} else {
break
}
}
log.Info("heartbeat exit")
}
24 changes: 12 additions & 12 deletions cmd/frps/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ func controlWorker(c *conn.Conn) {
serverCtlReq := &msg.ClientCtlReq{}
serverCtlReq.Type = consts.WorkConn
for {
_, isStop := s.WaitUserConn()
if isStop {
closeFlag := s.WaitUserConn()
if closeFlag {
log.Debug("ProxyName [%s], goroutine for dealing user conn is closed", s.Name)
break
}
buf, _ := json.Marshal(serverCtlReq)
Expand All @@ -90,7 +91,7 @@ func controlWorker(c *conn.Conn) {
log.Debug("ProxyName [%s], write to client to add work conn success", s.Name)
}

log.Error("ProxyName [%s], I'm dead!", s.Name)
log.Info("ProxyName [%s], I'm dead!", s.Name)
return
}

Expand Down Expand Up @@ -152,26 +153,25 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
isContinueRead := true
f := func() {
isContinueRead = false
s.StopWaitUserConn()
c.Close()
s.Close()
}
timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f)
defer timer.Stop()

for isContinueRead {
content, err := c.ReadLine()
//log.Debug("Receive msg from client! content:%s", content)
_, err := c.ReadLine()
if err != nil {
if err == io.EOF {
log.Warn("Server detect client[%s] is dead!", s.Name)
s.StopWaitUserConn()
log.Warn("ProxyName [%s], client is dead!", s.Name)
c.Close()
s.Close()
break
}
log.Error("ProxyName [%s], read error:%s", s.Name, err.Error())
log.Error("ProxyName [%s], read error: %v", s.Name, err)
continue
}

if content == "\n" {
timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
}
timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
}
}
6 changes: 2 additions & 4 deletions models/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,21 @@ type ProxyClient struct {
}

func (p *ProxyClient) GetLocalConn() (c *conn.Conn, err error) {
c = &conn.Conn{}
err = c.ConnectServer("127.0.0.1", p.LocalPort)
c, err = conn.ConnectServer("127.0.0.1", p.LocalPort)
if err != nil {
log.Error("ProxyName [%s], connect to local port error, %v", p.Name, err)
}
return
}

func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err error) {
c = &conn.Conn{}
defer func() {
if err != nil {
c.Close()
}
}()

err = c.ConnectServer(addr, port)
c, err = conn.ConnectServer(addr, port)
if err != nil {
log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", p.Name, addr, port, err)
return
Expand Down
82 changes: 44 additions & 38 deletions models/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,38 @@ import (
)

type ProxyServer struct {
Name string
Passwd string
BindAddr string
ListenPort int64

Status int64
Listener *conn.Listener // accept new connection from remote users
CtlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
StopBlockChan chan int64 // put any number to the channel, if you want to stop wait user conn
CliConnChan chan *conn.Conn // get client conns from control goroutine
UserConnList *list.List // store user conns
Mutex sync.Mutex
Name string
Passwd string
BindAddr string
ListenPort int64
Status int64
CliConnChan chan *conn.Conn // get client conns from control goroutine

listener *conn.Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
userConnList *list.List // store user conns
mutex sync.Mutex
}

func (p *ProxyServer) Init() {
p.Status = consts.Idle
p.CtlMsgChan = make(chan int64)
p.StopBlockChan = make(chan int64)
p.CliConnChan = make(chan *conn.Conn)
p.UserConnList = list.New()
p.ctlMsgChan = make(chan int64)
p.userConnList = list.New()
}

func (p *ProxyServer) Lock() {
p.Mutex.Lock()
p.mutex.Lock()
}

func (p *ProxyServer) Unlock() {
p.Mutex.Unlock()
p.mutex.Unlock()
}

// start listening for user conns
func (p *ProxyServer) Start() (err error) {
p.Listener, err = conn.Listen(p.BindAddr, p.ListenPort)
p.Init()
p.listener, err = conn.Listen(p.BindAddr, p.ListenPort)
if err != nil {
return err
}
Expand All @@ -53,36 +52,45 @@ func (p *ProxyServer) Start() (err error) {
go func() {
for {
// block
c := p.Listener.GetConn()
// if listener is closed, get nil
c := p.listener.GetConn()
if c == nil {
log.Info("ProxyName [%s], listener is closed", p.Name)
return
}
log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())

// put to list
// insert into list
p.Lock()
if p.Status != consts.Working {
log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
c.Close()
p.Unlock()
return
}
p.UserConnList.PushBack(c)
p.userConnList.PushBack(c)
p.Unlock()

// put msg to control conn
p.CtlMsgChan <- 1
p.ctlMsgChan <- 1
}
}()

// start another goroutine for join two conns from client and user
go func() {
for {
cliConn := <-p.CliConnChan
cliConn, ok := <-p.CliConnChan
if !ok {
return
}

p.Lock()
element := p.UserConnList.Front()
element := p.userConnList.Front()

var userConn *conn.Conn
if element != nil {
userConn = element.Value.(*conn.Conn)
p.UserConnList.Remove(element)
p.userConnList.Remove(element)
} else {
cliConn.Close()
p.Unlock()
Expand All @@ -104,21 +112,19 @@ func (p *ProxyServer) Start() (err error) {
func (p *ProxyServer) Close() {
p.Lock()
p.Status = consts.Idle
p.CtlMsgChan = make(chan int64)
p.CliConnChan = make(chan *conn.Conn)
p.UserConnList = list.New()
p.listener.Close()
close(p.ctlMsgChan)
close(p.CliConnChan)
p.userConnList = list.New()
p.Unlock()
}

func (p *ProxyServer) WaitUserConn() (res int64, isStop bool) {
select {
case res = <-p.CtlMsgChan:
return res, false
case <-p.StopBlockChan:
return 0, true
}
}
func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
closeFlag = false

func (p *ProxyServer) StopWaitUserConn() {
p.StopBlockChan <- 1
_, ok := <-p.ctlMsgChan
if !ok {
closeFlag = true
}
return
}
Loading

0 comments on commit 26479cf

Please sign in to comment.