Skip to content

Commit

Permalink
fix: panic: send on closed channel
Browse files Browse the repository at this point in the history
Signed-off-by: Jianhui Zhao <[email protected]>
  • Loading branch information
zhaojh329 committed Sep 22, 2021
1 parent 4548689 commit cc5e8b7
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
26 changes: 20 additions & 6 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ type broker struct {
termMessage chan *termMessage
fileMessage chan *fileMessage
userMessage chan *usrMessage
cmdMessage chan []byte
httpMessage chan *httpResp
cmdResp chan []byte
cmdReq chan *commandReq
httpResp chan *httpResp
httpReq chan *httpReq
fileProxy sync.Map
devCertPool *x509.CertPool
}
Expand All @@ -53,8 +55,10 @@ func newBroker(cfg *config.Config) *broker {
termMessage: make(chan *termMessage, 1000),
fileMessage: make(chan *fileMessage, 1000),
userMessage: make(chan *usrMessage, 1000),
cmdMessage: make(chan []byte, 1000),
httpMessage: make(chan *httpResp, 1000),
cmdResp: make(chan []byte, 1000),
cmdReq: make(chan *commandReq, 1000),
httpResp: make(chan *httpResp, 1000),
httpReq: make(chan *httpReq, 1000),
}
}

Expand Down Expand Up @@ -272,10 +276,20 @@ func (br *broker) run() {
log.Error().Msg("Not found sid: " + msg.sid)
}

case data := <-br.cmdMessage:
case req := <-br.cmdReq:
if dev, ok := br.devices[req.devid]; ok {
dev.WriteMsg(msgTypeCmd, req.data)
}

case data := <-br.cmdResp:
handleCmdResp(data)

case resp := <-br.httpMessage:
case req := <-br.httpReq:
if dev, ok := br.devices[req.devid]; ok {
dev.WriteMsg(msgTypeHttp, req.data)
}

case resp := <-br.httpResp:
handleHttpProxyResp(resp)
}
}
Expand Down
8 changes: 6 additions & 2 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type commandInfo struct {
type commandReq struct {
cancel context.CancelFunc
c *gin.Context
devid string
data []byte
}

var commands sync.Map
Expand Down Expand Up @@ -68,9 +70,10 @@ func handleCmdReq(br *broker, c *gin.Context) {
req := &commandReq{
cancel: cancel,
c: c,
devid: devid,
}

dev, ok := br.devices[devid]
_, ok := br.devices[devid]
if !ok {
cmdErrReply(rttyCmdErrOffline, req)
return
Expand Down Expand Up @@ -108,7 +111,8 @@ func handleCmdReq(br *broker, c *gin.Context) {
msg = append(msg, 0)
}

dev.WriteMsg(msgTypeCmd, msg)
req.data = msg
br.cmdReq <- req

waitTime := commandTimeout

Expand Down
4 changes: 2 additions & 2 deletions device.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ func (dev *device) readLoop() {
return
}

dev.br.cmdMessage <- b
dev.br.cmdResp <- b

case msgTypeHttp:
if msgLen < 18 {
log.Error().Msg("msgTypeHttp: invalid")
return
}

dev.br.httpMessage <- &httpResp{b, dev}
dev.br.httpResp <- &httpResp{b, dev}

case msgTypeHeartbeat:
parseHeartbeat(dev, b)
Expand Down
16 changes: 10 additions & 6 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type httpResp struct {
dev client.Client
}

type httpReq struct {
devid string
data []byte
}

var httpProxyCons sync.Map
var httpProxySessions sync.Map

Expand Down Expand Up @@ -72,17 +77,16 @@ type HttpProxyWriter struct {
destAddr []byte
srcAddr []byte
hostHeaderRewrite string
dev client.Client
br *broker
devid string
}

func (rw *HttpProxyWriter) Write(p []byte) (n int, err error) {
msg := append([]byte{}, rw.srcAddr...)
msg = append(msg, rw.destAddr...)
msg = append(msg, p...)

dev := rw.dev.(*device)

dev.WriteMsg(msgTypeHttp, msg)
rw.br.httpReq <- &httpReq{rw.devid, msg}

return len(p), nil
}
Expand All @@ -108,7 +112,7 @@ func doHttpProxy(brk *broker, c net.Conn) {
}
devid := cookie.Value

dev, ok := brk.devices[devid]
_, ok := brk.devices[devid]
if !ok {
return
}
Expand Down Expand Up @@ -153,7 +157,7 @@ func doHttpProxy(brk *broker, c net.Conn) {
return
}

hpw := &HttpProxyWriter{destAddr, srcAddr, hostHeaderRewrite, dev}
hpw := &HttpProxyWriter{destAddr, srcAddr, hostHeaderRewrite, brk, devid}

req.Host = hostHeaderRewrite
hpw.WriteRequest(req)
Expand Down

0 comments on commit cc5e8b7

Please sign in to comment.