Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Jianhui Zhao <[email protected]>
  • Loading branch information
zhaojh329 committed Feb 2, 2021
1 parent 506bf69 commit 1db48db
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 49 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package client

// Client abstract device and user
type Client interface {
WriteMsg(typ int, data []byte) error
WriteMsg(typ int, data []byte)

// For users, return the device ID that the user wants to access
// For devices, return the ID of the device
Expand Down
72 changes: 35 additions & 37 deletions device.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,12 @@ func (dev *device) DeviceID() string {
return dev.id
}

func (dev *device) keepAlive(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

ninactive := 0
lastHeartbeat := time.Now()
func (dev *device) WriteMsg(typ int, data []byte) {
b := []byte{byte(typ), 0, 0}

for {
select {
case <-ticker.C:
now := time.Now()
if now.Sub(dev.active) > heartbeatInterval*3/2 {
log.Error().Msgf("Inactive device in long time: %s", dev.id)
if ninactive > 1 {
log.Error().Msgf("Inactive 3 times, now kill it: %s", dev.id)
dev.Close()
return
}
ninactive = ninactive + 1
}
binary.BigEndian.PutUint16(b[1:], uint16(len(data)))

if now.Sub(lastHeartbeat) > heartbeatInterval-1 {
lastHeartbeat = now
dev.WriteMsg(msgTypeHeartbeat, []byte{})
}
case <-ctx.Done():
return
}
}
dev.conn.Write(append(b, data...))
}

func (dev *device) Close() {
Expand All @@ -112,16 +89,6 @@ func (dev *device) Close() {
}
}

func (dev *device) WriteMsg(typ int, data []byte) error {
b := []byte{byte(typ), 0, 0}

binary.BigEndian.PutUint16(b[1:], uint16(len(data)))

_, err := dev.conn.Write(append(b, data...))

return err
}

func parseDeviceInfo(b []byte) (string, string, string) {
fields := bytes.Split(b, []byte{0})

Expand All @@ -140,6 +107,37 @@ func parseHeartbeat(dev *device, b []byte) {
dev.uptime = binary.BigEndian.Uint32(b[:4])
}

func (dev *device) keepAlive(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

ninactive := 0
lastHeartbeat := time.Now()

for {
select {
case <-ticker.C:
now := time.Now()
if now.Sub(dev.active) > heartbeatInterval*3/2 {
log.Error().Msgf("Inactive device in long time: %s", dev.id)
if ninactive > 1 {
log.Error().Msgf("Inactive 3 times, now kill it: %s", dev.id)
dev.Close()
return
}
ninactive = ninactive + 1
}

if now.Sub(lastHeartbeat) > heartbeatInterval-1 {
lastHeartbeat = now
dev.WriteMsg(msgTypeHeartbeat, []byte{})
}
case <-ctx.Done():
return
}
}
}

func (dev *device) readLoop() {
defer dev.Close()

Expand Down
54 changes: 43 additions & 11 deletions user.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"sync"
"time"
Expand All @@ -26,6 +27,8 @@ type user struct {
conn *websocket.Conn
closeMutex sync.Mutex
closed bool
cancel context.CancelFunc
send chan *usrMessage // Buffered channel of outbound messages.
}

type usrMessage struct {
Expand All @@ -48,8 +51,11 @@ func (u *user) DeviceID() string {
return u.devid
}

func (u *user) WriteMsg(typ int, data []byte) error {
return u.conn.WriteMessage(typ, data)
func (u *user) WriteMsg(typ int, data []byte) {
u.send <- &usrMessage{
typ: typ,
data: data,
}
}

func (u *user) Close() {
Expand All @@ -59,6 +65,7 @@ func (u *user) Close() {

if !u.closed {
u.closed = true
u.cancel()
u.conn.Close()
u.br.unregister <- u
}
Expand All @@ -69,14 +76,17 @@ func userLoginAck(code int, c client.Client) {
c.WriteMsg(websocket.TextMessage, []byte(msg))
}

func (u *user) keepAlive() {
func (u *user) keepAlive(ctx context.Context) {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for {
err := u.WriteMsg(websocket.PingMessage, []byte{})
if err != nil {
select {
case <-ticker.C:
u.WriteMsg(websocket.PingMessage, []byte{})
case <-ctx.Done():
return
}

time.Sleep(time.Second * 5)
}
}

Expand All @@ -96,6 +106,23 @@ func (u *user) readLoop() {
}
}

func (u *user) writeLoop(ctx context.Context) {
defer u.Close()

for {
select {
case msg := <-u.send:
err := u.conn.WriteMessage(msg.typ, msg.data)
if err != nil {
log.Error().Msg(err.Error())
return
}
case <-ctx.Done():
return
}
}
}

func serveUser(br *broker, c *gin.Context) {
devid := c.Param("devid")
if devid == "" {
Expand All @@ -110,14 +137,19 @@ func serveUser(br *broker, c *gin.Context) {
return
}

ctx, cancel := context.WithCancel(context.Background())

u := &user{
br: br,
conn: conn,
devid: devid,
br: br,
conn: conn,
devid: devid,
cancel: cancel,
send: make(chan *usrMessage, 256),
}

go u.keepAlive()
go u.readLoop()
go u.keepAlive(ctx)
go u.writeLoop(ctx)

br.register <- u
}

0 comments on commit 1db48db

Please sign in to comment.