Skip to content

Commit

Permalink
cluster,example: fix chat cluster demo
Browse files Browse the repository at this point in the history
Signed-off-by: Lonng <[email protected]>
  • Loading branch information
lonng committed Jun 30, 2019
1 parent abfc3be commit 33d31c2
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 49 deletions.
66 changes: 65 additions & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package cluster
import (
"context"
"fmt"
"sync"

"github.com/lonng/nano/cluster/clusterpb"
"github.com/lonng/nano/internal/log"
Expand All @@ -34,8 +35,10 @@ import (
type cluster struct {
// If cluster is not large enough, use slice is OK
currentNode *Node
members []*Member
rpcClient *rpcClient

mu sync.RWMutex
members []*Member
}

func newCluster(currentNode *Node) *cluster {
Expand Down Expand Up @@ -77,7 +80,9 @@ func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (*

// Register services to current node
c.currentNode.handler.addRemoteService(req.MemberInfo)
c.mu.Lock()
c.members = append(c.members, &Member{isMaster: false, memberInfo: req.MemberInfo})
c.mu.Unlock()
return resp, nil
}

Expand Down Expand Up @@ -117,14 +122,73 @@ func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest

// Register services to current node
c.currentNode.handler.delMember(req.ServiceAddr)
c.mu.Lock()
if index == len(c.members)-1 {
c.members = c.members[:index]
} else {
c.members = append(c.members[:index], c.members[index+1:]...)
}
c.mu.Unlock()
return resp, nil
}

func (c *cluster) setRpcClient(client *rpcClient) {
c.rpcClient = client
}

func (c *cluster) remoteAddrs() []string {
var addrs []string
c.mu.RLock()
for _, m := range c.members {
addrs = append(addrs, m.memberInfo.ServiceAddr)
}
c.mu.RUnlock()
return addrs
}

func (c *cluster) initMembers(members []*clusterpb.MemberInfo) {
c.mu.Lock()
for _, info := range members {
c.members = append(c.members, &Member{
memberInfo: info,
})
}
c.mu.Unlock()
}

func (c *cluster) addMember(info *clusterpb.MemberInfo) {
c.mu.Lock()
var found bool
for _, member := range c.members {
if member.memberInfo.ServiceAddr == info.ServiceAddr {
member.memberInfo = info
found = true
break
}
}
if !found {
c.members = append(c.members, &Member{
memberInfo: info,
})
}
c.mu.Unlock()
}

func (c *cluster) delMember(addr string) {
c.mu.Lock()
var index = -1
for i, member := range c.members {
if member.memberInfo.ServiceAddr == addr {
index = i
break
}
}
if index != -1 {
if index == len(c.members)-1 {
c.members = c.members[:index]
} else {
c.members = append(c.members[:index], c.members[index+1:]...)
}
}
c.mu.Unlock()
}
11 changes: 9 additions & 2 deletions cluster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,14 @@ func (h *LocalHandler) handle(conn net.Conn) {

// guarantee agent related resource be destroyed
defer func() {
remotes := agent.session.Router().Remote()
request := &clusterpb.SessionClosedRequest{
SessionId: agent.session.ID(),
}
for _, remote := range remotes {

members := h.currentNode.cluster.remoteAddrs()
log.Println("===>", len(members))
for _, remote := range members {
log.Println("Notify remote server success", remote)
pool, err := h.currentNode.rpcClient.getConnPool(remote)
if err != nil {
log.Println("Cannot retrieve connection pool for address", remote, err)
Expand All @@ -204,6 +207,10 @@ func (h *LocalHandler) handle(conn net.Conn) {
_, err = client.SessionClosed(context.Background(), request)
if err != nil {
log.Println("Cannot closed session in remote address", remote, err)
continue
}
if env.Debug {
log.Println("Notify remote server success", remote)
}
}

Expand Down
3 changes: 3 additions & 0 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (n *Node) initNode() error {
resp, err := client.Register(context.Background(), request)
if err == nil {
n.handler.initRemoteService(resp.Members)
n.cluster.initMembers(resp.Members)
break
}
log.Println("Register current node to cluster failed", err, "and will retry in", n.RetryInterval.String())
Expand Down Expand Up @@ -374,11 +375,13 @@ func (n *Node) HandleResponse(_ context.Context, req *clusterpb.ResponseMessage)

func (n *Node) NewMember(_ context.Context, req *clusterpb.NewMemberRequest) (*clusterpb.NewMemberResponse, error) {
n.handler.addRemoteService(req.MemberInfo)
n.cluster.addMember(req.MemberInfo)
return &clusterpb.NewMemberResponse{}, nil
}

func (n *Node) DelMember(_ context.Context, req *clusterpb.DelMemberRequest) (*clusterpb.DelMemberResponse, error) {
n.handler.delMember(req.ServiceAddr)
n.cluster.delMember(req.ServiceAddr)
return &clusterpb.DelMemberResponse{}, nil
}

Expand Down
47 changes: 44 additions & 3 deletions examples/cluster/chat/chat_service.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,60 @@
package chat

import (
"fmt"
"log"

"github.com/lonng/nano"
"github.com/lonng/nano/component"
"github.com/lonng/nano/examples/cluster/protocol"
"github.com/lonng/nano/session"
"github.com/pingcap/errors"
)

type RoomService struct {
component.Base
group *nano.Group
}

func newRoomService() *RoomService {
return &RoomService{}
return &RoomService{
group: nano.NewGroup("all-users"),
}
}

func (rs *RoomService) JoinRoom(s *session.Session, msg *protocol.JoinRoomRequest) error {
if err := s.Bind(msg.MasterUid); err != nil {
return errors.Trace(err)
}

broadcast := &protocol.NewUserBroadcast{
Content: fmt.Sprintf("User user join: %v", msg.Nickname),
}
if err := rs.group.Broadcast("onNewUser", broadcast); err != nil {
return errors.Trace(err)
}
return rs.group.Add(s)
}

type SyncMessage struct {
Name string `json:"name"`
Content string `json:"content"`
}

func (rs *RoomService) SyncMessage(s *session.Session, msg *SyncMessage) error {
// Send an RPC to master server to stats
if err := s.RPC("TopicService.Stats", &protocol.MasterStats{Uid: s.UID()}); err != nil {
return errors.Trace(err)
}

// Sync message to all members in this room
return rs.group.Broadcast("onMessage", msg)
}

func (cs *RoomService) JoinTopic(s *session.Session, msg []byte) error {
return errors.Errorf("not implement")
func (rs *RoomService) userDisconnected(s *session.Session) {
if err := rs.group.Leave(s); err != nil {
log.Println("Remove user from group failed", s.UID(), err)
return
}
log.Println("User session disconnected", s.UID())
}
9 changes: 8 additions & 1 deletion examples/cluster/chat/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package chat

import "github.com/lonng/nano/component"
import (
"github.com/lonng/nano/component"
"github.com/lonng/nano/session"
)

var (
// All services in master server
Expand All @@ -12,3 +15,7 @@ var (
func init() {
Services.Register(roomService)
}

func OnSessionClosed(s *session.Session) {
roomService.userDisconnected(s)
}
19 changes: 14 additions & 5 deletions examples/cluster/gate/gate_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,26 @@ func newBindService() *BindService {
return &BindService{}
}

type LoginRequest struct {
Nickname string `json:"nickname"`
}
type (
LoginRequest struct {
Nickname string `json:"nickname"`
}
LoginResponse struct {
Code int `json:"code"`
}
)

func (bs *BindService) Login(s *session.Session, msg *LoginRequest) error {
bs.nextGateUid++
uid := bs.nextGateUid
return s.RPC("TopicService.NewUser", &protocol.NewUserRequest{
request := &protocol.NewUserRequest{
Nickname: msg.Nickname,
GateUid: uid,
})
}
if err := s.RPC("TopicService.NewUser", request); err != nil {
return errors.Trace(err)
}
return s.Response(&LoginResponse{})
}

func (bs *BindService) BindChatServer(s *session.Session, msg []byte) error {
Expand Down
3 changes: 3 additions & 0 deletions examples/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ func runChat(args *cli.Context) error {
log.Println("Current chat server listen address", listen)
log.Println("Remote master server address", masterAddr)

// Register session closed callback
session.Lifetime.OnClosed(chat.OnSessionClosed)

// Startup Nano server with the specified listen address
nano.Listen(listen,
nano.WithAdvertiseAddr(masterAddr),
Expand Down
51 changes: 33 additions & 18 deletions examples/cluster/master/topic_service.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package master

import (
"fmt"
"log"
"strings"

"github.com/lonng/nano"
"github.com/lonng/nano/component"
"github.com/lonng/nano/examples/cluster/protocol"
"github.com/lonng/nano/session"
Expand All @@ -16,60 +15,76 @@ type User struct {
nickname string
gateId int64
masterId int64
balance int64
message int
}

type TopicService struct {
component.Base
nextUid int64
users map[int64]*User
group *nano.Group
}

func newTopicService() *TopicService {
return &TopicService{
users: map[int64]*User{},
group: nano.NewGroup("all-users"),
}
}

type ExistsMembersResponse struct {
Members string `json:"members"`
}

func (ts *TopicService) NewUser(s *session.Session, msg *protocol.NewUserRequest) error {
ts.nextUid++
uid := ts.nextUid
if err := s.Bind(uid); err != nil {
return errors.Trace(err)
}

var members []string
for _, u := range ts.users {
members = append(members, u.nickname)
}
err := s.Push("onMembers", &ExistsMembersResponse{Members: strings.Join(members, ",")})
if err != nil {
return errors.Trace(err)
}

user := &User{
session: s,
nickname: msg.Nickname,
gateId: msg.GateUid,
masterId: uid,
balance: 1000,
}
ts.users[uid] = user

broadcast := &protocol.NewUserBroadcast{
Content: fmt.Sprintf("User user join: %v", msg.Nickname),
}
if err := ts.group.Broadcast("onNewUser", broadcast); err != nil {
return errors.Trace(err)
chat := &protocol.JoinRoomRequest{
Nickname: msg.Nickname,
GateUid: msg.GateUid,
MasterUid: uid,
}
return ts.group.Add(s)
return s.RPC("RoomService.JoinRoom", chat)
}

type OpenTopicRequest struct {
Name string `json:"name"`
type UserBalanceResponse struct {
CurrentBalance int64 `json:"currentBalance"`
}

func (ts *TopicService) OpenTopic(s *session.Session, msg *OpenTopicRequest) error {
return errors.Errorf("not implemented: %v", msg)
func (ts *TopicService) Stats(s *session.Session, msg *protocol.MasterStats) error {
// It's OK to use map without lock because of this service running in main thread
user, found := ts.users[msg.Uid]
if !found {
return errors.Errorf("User not found: %v", msg.Uid)
}
user.message++
user.balance--
return s.Push("onBalance", &UserBalanceResponse{user.balance})
}

func (ts *TopicService) userDisconnected(s *session.Session) {
uid := s.UID()
delete(ts.users, uid)
if err := ts.group.Leave(s); err != nil {
log.Println("Remove user from group failed", s.UID(), err)
return
}
log.Println("User session disconnected", s.UID())
}
Loading

0 comments on commit 33d31c2

Please sign in to comment.