Skip to content

Commit

Permalink
Return error messages for some exposed methods, fix panic when connec…
Browse files Browse the repository at this point in the history
…tion is closed. Close olahol#21.
  • Loading branch information
Ola Holmström committed Feb 10, 2017
1 parent e39e120 commit 3321ec3
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 46 deletions.
11 changes: 7 additions & 4 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ loop:
if _, ok := h.sessions[s]; ok {
h.rwmutex.Lock()
delete(h.sessions, s)
s.conn.Close()
close(s.output)
h.rwmutex.Unlock()
}
case m := <-h.broadcast:
Expand All @@ -58,8 +56,7 @@ loop:
h.rwmutex.Lock()
for s := range h.sessions {
delete(h.sessions, s)
s.conn.Close()
close(s.output)
s.Close()
}
h.open = false
h.rwmutex.Unlock()
Expand All @@ -68,6 +65,12 @@ loop:
}
}

func (h *hub) closed() bool {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return !h.open
}

func (h *hub) len() int {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
Expand Down
69 changes: 54 additions & 15 deletions melody.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package melody

import (
"errors"
"github.com/gorilla/websocket"
"net/http"
"sync"
Expand Down Expand Up @@ -79,12 +80,15 @@ func (m *Melody) HandleError(fn func(*Session, error)) {
}

// HandleRequest upgrades http requests to websocket connections and dispatches them to be handled by the melody instance.
func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) {
func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) error {
if m.hub.closed() {
return errors.New("Melody instance is closed.")
}

conn, err := m.Upgrader.Upgrade(w, r, nil)

if err != nil {
m.errorHandler(nil, err)
return
return err
}

session := &Session{
Expand All @@ -93,7 +97,8 @@ func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) {
conn: conn,
output: make(chan *envelope, m.Config.MessageBufferSize),
melody: m,
lock: &sync.Mutex{},
open: true,
rwmutex: &sync.RWMutex{},
}

m.hub.register <- session
Expand All @@ -104,54 +109,88 @@ func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) {

session.readPump()

if m.hub.open {
if !m.hub.closed() {
m.hub.unregister <- session
}

go m.disconnectHandler(session)
session.close()

m.disconnectHandler(session)

return nil
}

// Broadcast broadcasts a text message to all sessions.
func (m *Melody) Broadcast(msg []byte) {
func (m *Melody) Broadcast(msg []byte) error {
if m.hub.closed() {
return errors.New("Melody instance is closed.")
}

message := &envelope{t: websocket.TextMessage, msg: msg}
m.hub.broadcast <- message

return nil
}

// BroadcastFilter broadcasts a text message to all sessions that fn returns true for.
func (m *Melody) BroadcastFilter(msg []byte, fn func(*Session) bool) {
func (m *Melody) BroadcastFilter(msg []byte, fn func(*Session) bool) error {
if m.hub.closed() {
return errors.New("Melody instance is closed.")
}

message := &envelope{t: websocket.TextMessage, msg: msg, filter: fn}
m.hub.broadcast <- message

return nil
}

// BroadcastOthers broadcasts a text message to all sessions except session s.
func (m *Melody) BroadcastOthers(msg []byte, s *Session) {
m.BroadcastFilter(msg, func(q *Session) bool {
func (m *Melody) BroadcastOthers(msg []byte, s *Session) error {
return m.BroadcastFilter(msg, func(q *Session) bool {
return s != q
})
}

// BroadcastBinary broadcasts a binary message to all sessions.
func (m *Melody) BroadcastBinary(msg []byte) {
func (m *Melody) BroadcastBinary(msg []byte) error {
if m.hub.closed() {
return errors.New("Melody instance is closed.")
}

message := &envelope{t: websocket.BinaryMessage, msg: msg}
m.hub.broadcast <- message

return nil
}

// BroadcastBinaryFilter broadcasts a binary message to all sessions that fn returns true for.
func (m *Melody) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool) {
func (m *Melody) BroadcastBinaryFilter(msg []byte, fn func(*Session) bool) error {
if m.hub.closed() {
return errors.New("Melody instance is closed.")
}

message := &envelope{t: websocket.BinaryMessage, msg: msg, filter: fn}
m.hub.broadcast <- message

return nil
}

// BroadcastBinaryOthers broadcasts a binary message to all sessions except session s.
func (m *Melody) BroadcastBinaryOthers(msg []byte, s *Session) {
m.BroadcastBinaryFilter(msg, func(q *Session) bool {
func (m *Melody) BroadcastBinaryOthers(msg []byte, s *Session) error {
return m.BroadcastBinaryFilter(msg, func(q *Session) bool {
return s != q
})
}

// Close closes the melody instance and all connected sessions.
func (m *Melody) Close() {
func (m *Melody) Close() error {
if m.hub.closed() {
return errors.New("Melody instance is already closed.")
}

m.hub.exit <- true

return nil
}

// Len return the number of connected sessions.
Expand Down
81 changes: 81 additions & 0 deletions melody_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,43 @@ func TestEcho(t *testing.T) {
}
}

func TestWriteClosed(t *testing.T) {
echo := NewTestServerHandler(func(session *Session, msg []byte) {
session.Write(msg)
})
server := httptest.NewServer(echo)
defer server.Close()

fn := func(msg string) bool {
conn, err := NewDialer(server.URL)

if err != nil {
t.Error(err)
return false
}

conn.WriteMessage(websocket.TextMessage, []byte(msg))

echo.m.HandleConnect(func(s *Session) {
s.Close()
})

echo.m.HandleDisconnect(func(s *Session) {
err := s.Write([]byte("hello world"))

if err == nil {
t.Error("should be an error")
}
})

return true
}

if err := quick.Check(fn, nil); err != nil {
t.Error(err)
}
}

func TestLen(t *testing.T) {
rand.Seed(time.Now().UnixNano())

Expand Down Expand Up @@ -641,3 +678,47 @@ func TestPong(t *testing.T) {
t.Error("should have fired pong handler")
}
}

func BenchmarkSessionWrite(b *testing.B) {
echo := NewTestServerHandler(func(session *Session, msg []byte) {
session.Write(msg)
})
server := httptest.NewServer(echo)
conn, _ := NewDialer(server.URL)
defer server.Close()
defer conn.Close()

for n := 0; n < b.N; n++ {
conn.WriteMessage(websocket.TextMessage, []byte("test"))
conn.ReadMessage()
}
}

func BenchmarkBroadcast(b *testing.B) {
echo := NewTestServerHandler(func(session *Session, msg []byte) {
session.Write(msg)
})
server := httptest.NewServer(echo)
defer server.Close()

conns := make([]*websocket.Conn, 0)

num := 100

for i := 0; i < num; i++ {
conn, _ := NewDialer(server.URL)
conns = append(conns, conn)
}

for n := 0; n < b.N; n++ {
echo.m.Broadcast([]byte("test"))

for i := 0; i < num; i++ {
conns[i].ReadMessage()
}
}

for i := 0; i < num; i++ {
conns[i].Close()
}
}
Loading

0 comments on commit 3321ec3

Please sign in to comment.