Skip to content

Commit

Permalink
refactor: redesign group interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Aug 7, 2017
1 parent 88a70b3 commit 985742d
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 62 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ implement a chat room in 100 lines with golang and websocket
package main

import (
"fmt"
"log"
"net/http"

"fmt"
"github.com/lonnng/nano"
"github.com/lonnng/nano/component"
"github.com/lonnng/nano/serialize/json"
Expand Down Expand Up @@ -90,16 +90,16 @@ func NewRoom() *Room {
}
}

func (r *Room) AfterInit(){
func (r *Room) AfterInit() {
nano.OnSessionClosed(func(s *session.Session) {
r.group.Leave(s.Uid())
r.group.Leave(s)
})
}

// Join room
func (r *Room) Join(s *session.Session, msg []byte) error {
s.Bind(s.ID()) // binding session uid
s.Push("onMembers", &AllMembers{Members:r.group.Members()})
s.Push("onMembers", &AllMembers{Members: r.group.Members()})
// notify others
r.group.Broadcast("onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
// new user join group
Expand All @@ -123,7 +123,6 @@ func main() {
nano.SetCheckOriginFunc(func(_ *http.Request) bool { return true })
nano.ListenWS(":3250")
}

```

- client
Expand Down
8 changes: 4 additions & 4 deletions docs/get_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ import (
"fmt"
"log"
"net/http"

"github.com/lonnng/nano"
"github.com/lonnng/nano/component"
"github.com/lonnng/nano/serialize/json"
Expand Down Expand Up @@ -153,16 +153,16 @@ func NewRoom() *Room {
}
}

func (r *Room) AfterInit(){
func (r *Room) AfterInit() {
nano.OnSessionClosed(func(s *session.Session) {
r.group.Leave(s.Uid())
r.group.Leave(s)
})
}

// Join room
func (r *Room) Join(s *session.Session, msg []byte) error {
s.Bind(s.ID()) // binding session uid
s.Push("onMembers", &AllMembers{Members:r.group.Members()})
s.Push("onMembers", &AllMembers{Members: r.group.Members()})
// notify others
r.group.Broadcast("onNewUser", &NewUser{Content: fmt.Sprintf("New user: %d", s.ID())})
// new user join group
Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ var (
ErrClosedGroup = errors.New("group closed")
ErrMemberNotFound = errors.New("member not found in the group")
ErrCloseClosedSession = errors.New("close closed session")
ErrSessionDuplication = errors.New("session has existed in the current group")
)
4 changes: 2 additions & 2 deletions examples/demo/chat/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package main

import (
"fmt"
"log"
"net/http"

"fmt"
"github.com/lonnng/nano"
"github.com/lonnng/nano/component"
"github.com/lonnng/nano/serialize/json"
Expand Down Expand Up @@ -46,7 +46,7 @@ func NewRoom() *Room {

func (r *Room) AfterInit() {
nano.OnSessionClosed(func(s *session.Session) {
r.group.Leave(s.Uid())
r.group.Leave(s)
})
}

Expand Down
103 changes: 52 additions & 51 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,53 @@ const (
groupStatusClosed = 1
)

// SessionFilter represents a filter which was used to filter session when Multicast,
// the session will receive the message while filter returns true.
type SessionFilter func(*session.Session) bool

// Group represents a session group which used to manage a number of
// sessions, data send to the group will send to all session in it.
type Group struct {
sync.RWMutex
status int32
name string // channel name
uids map[int64]*session.Session // uid map to session pointer
members []int64 // all user ids
status int32 // channel current status
name string // channel name
sessions map[int64]*session.Session // session id map to session instance
}

// NewGroup returns a new group instance
func NewGroup(n string) *Group {
return &Group{
status: groupStatusWorking,
name: n,
uids: make(map[int64]*session.Session),
status: groupStatusWorking,
name: n,
sessions: make(map[int64]*session.Session),
}
}

func (c *Group) Member(uid int64) *session.Session {
// Member returns specified UID's session
func (c *Group) Member(uid int64) (*session.Session, error) {
c.RLock()
defer c.RUnlock()

return c.uids[uid]
for _, s := range c.sessions {
if s.Uid() == uid {
return s, nil
}
}

return nil, ErrMemberNotFound
}

// Members returns all member's UID in current group
func (c *Group) Members() []int64 {
c.RLock()
defer c.RUnlock()

return c.members
members := []int64{}
for _, s := range c.sessions {
members = append(members, s.Uid())
}

return members
}

// Push message to partial client, which filter return true
Expand All @@ -86,12 +101,11 @@ func (c *Group) Multicast(route string, v interface{}, filter SessionFilter) err
c.RLock()
defer c.RUnlock()

for _, s := range c.uids {
for _, s := range c.sessions {
if !filter(s) {
continue
}
err = s.Push(route, data)
if err != nil {
if err = s.Push(route, data); err != nil {
log.Println(err.Error())
}
}
Expand All @@ -117,71 +131,62 @@ func (c *Group) Broadcast(route string, v interface{}) error {
c.RLock()
defer c.RUnlock()

for _, s := range c.uids {
err = s.Push(route, data)
if err != nil {
log.Println(err.Error())
for _, s := range c.sessions {
if err = s.Push(route, data); err != nil {
log.Println(fmt.Sprintf("Session push message error, ID=%d, Uid=%d, Error=%s", s.ID(), s.Uid(), err.Error()))
}
}

return err
}

// IsContain decide whether a UID is contained in current group
func (c *Group) IsContain(uid int64) bool {
c.RLock()
defer c.RUnlock()

if _, ok := c.uids[uid]; ok {
return true
}

return false
_, err := c.Member(uid)
return err == nil
}

// Add add session to group
func (c *Group) Add(session *session.Session) error {
if c.isClosed() {
return ErrClosedGroup
}

if env.debug {
log.Println(fmt.Sprintf("Add session to group %s, Uid=%d", c.name, session.Uid()))
}

c.Lock()
defer c.Unlock()

uid := session.Uid()
c.uids[uid] = session
c.members = append(c.members, uid)
id := session.ID()
_, ok := c.sessions[session.ID()]
if ok {
return ErrSessionDuplication
}

c.sessions[id] = session
return nil
}

func (c *Group) Leave(uid int64) error {
// Leave remove specified UID related session from group
func (c *Group) Leave(s *session.Session) error {
if c.isClosed() {
return ErrClosedGroup
}

if !c.IsContain(uid) {
return ErrMemberNotFound
}

if env.debug {
log.Println(fmt.Sprintf("Remove session from group %s, Uid=%d", c.name, uid))
log.Println(fmt.Sprintf("Remove session from group %s, Uid=%d", c.name, s.Uid()))
}

c.Lock()
defer c.Unlock()

var temp []int64
for i, u := range c.members {
if u == uid {
temp = append(temp, c.members[:i]...)
c.members = append(temp, c.members[(i+1):]...)
break
}
}
delete(c.uids, uid)

delete(c.sessions, s.ID())
return nil
}

// LeaveAll clear all sessions in the group
func (c *Group) LeaveAll() error {
if c.isClosed() {
return ErrClosedGroup
Expand All @@ -190,9 +195,7 @@ func (c *Group) LeaveAll() error {
c.Lock()
defer c.Unlock()

c.uids = make(map[int64]*session.Session)
c.members = make([]int64, 0)

c.sessions = make(map[int64]*session.Session)
return nil
}

Expand All @@ -201,7 +204,7 @@ func (c *Group) Count() int {
c.RLock()
defer c.RUnlock()

return len(c.uids)
return len(c.sessions)
}

func (c *Group) isClosed() bool {
Expand All @@ -220,8 +223,6 @@ func (c *Group) Close() error {
atomic.StoreInt32(&c.status, groupStatusClosed)

// release all reference
c.uids = make(map[int64]*session.Session)
c.members = []int64{}

c.sessions = make(map[int64]*session.Session)
return nil
}

0 comments on commit 985742d

Please sign in to comment.