Skip to content

Commit

Permalink
Add JSON encoding support over WebSocket connections. Merge heroiclab…
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Nov 6, 2017
1 parent bcca141 commit bcb1673
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 85 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p
## [Unreleased]
### Added
- RUDP transport option for client connections.
- Accept JSON payload over WebSocket connections.

### Changed
- Consistently Use strings for all ID types.
- Consistently use strings for all ID types.
- Improve runtime hook lookup behaviour.

### [1.1.0] - 2017-10-17
Expand Down
18 changes: 15 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (

"nakama/pkg/social"

"runtime"

"github.com/armon/go-metrics"
"github.com/gogo/protobuf/jsonpb"
_ "github.com/lib/pq"
"github.com/satori/go.uuid"
"go.uber.org/zap"
"runtime"
)

const (
Expand Down Expand Up @@ -84,11 +86,21 @@ func main() {
// Check migration status and log if the schema has diverged.
cmd.MigrationStartupCheck(multiLogger, db)

jsonpbMarshaler := &jsonpb.Marshaler{
EnumsAsInts: true,
EmitDefaults: false,
Indent: "",
OrigName: false,
}
jsonpbUnmarshaler := &jsonpb.Unmarshaler{
AllowUnknownFields: false,
}

trackerService := server.NewTrackerService(config.GetName())
statsService := server.NewStatsService(jsonLogger, config, semver, trackerService, startedAt)
matchmakerService := server.NewMatchmakerService(config.GetName())
sessionRegistry := server.NewSessionRegistry(jsonLogger, config, trackerService, matchmakerService)
messageRouter := server.NewMessageRouterService(sessionRegistry)
messageRouter := server.NewMessageRouterService(jsonpbMarshaler, sessionRegistry)
presenceNotifier := server.NewPresenceNotifier(jsonLogger, config.GetName(), trackerService, messageRouter)
trackerService.AddDiffListener(presenceNotifier.HandleDiff)
notificationService := server.NewNotificationService(jsonLogger, db, trackerService, messageRouter, config.GetSocial().Notification)
Expand All @@ -101,7 +113,7 @@ func main() {
socialClient := social.NewClient(5 * time.Second)
purchaseService := server.NewPurchaseService(jsonLogger, multiLogger, db, config.GetPurchase())
pipeline := server.NewPipeline(config, db, trackerService, matchmakerService, messageRouter, sessionRegistry, socialClient, runtimePool, purchaseService, notificationService)
authService := server.NewAuthenticationService(jsonLogger, config, db, statsService, sessionRegistry, socialClient, pipeline, runtimePool)
authService := server.NewAuthenticationService(jsonLogger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, statsService, sessionRegistry, socialClient, pipeline, runtimePool)
dashboardService := server.NewDashboardService(jsonLogger, multiLogger, semver, config, statsService)

gaenabled := len(os.Getenv("NAKAMA_TELEMETRY")) < 1
Expand Down
5 changes: 3 additions & 2 deletions pkg/multicode/client_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ package multicode
import (
"errors"
"fmt"
"github.com/wirepair/netcode"
"go.uber.org/zap"
"net"
"sync"
"time"

"github.com/wirepair/netcode"
"go.uber.org/zap"
)

const (
Expand Down
3 changes: 2 additions & 1 deletion pkg/multicode/netcode_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
package multicode

import (
"net"

"github.com/wirepair/netcode"
"go.uber.org/zap"
"net"
)

type NetcodeData struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/multicode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@
package multicode

import (
"github.com/wirepair/netcode"
"go.uber.org/zap"
"net"
"sync"
"sync/atomic"
"time"

"github.com/wirepair/netcode"
"go.uber.org/zap"
)

// Not a true connection limit, only used to initialise/allocate various buffer and data structure sizes.
Expand Down
3 changes: 2 additions & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"io/ioutil"
"nakama/pkg/flags"

"net"

"github.com/go-yaml/yaml"
"github.com/satori/go.uuid"
"go.uber.org/zap"
"net"
)

// Config interface is the Nakama Core configuration
Expand Down
64 changes: 51 additions & 13 deletions server/message_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package server

import (
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
)
Expand All @@ -25,12 +26,14 @@ type MessageRouter interface {
}

type messageRouterService struct {
registry *SessionRegistry
jsonpbMarshaler *jsonpb.Marshaler
registry *SessionRegistry
}

func NewMessageRouterService(registry *SessionRegistry) *messageRouterService {
func NewMessageRouterService(jsonpbMarshaler *jsonpb.Marshaler, registry *SessionRegistry) *messageRouterService {
return &messageRouterService{
registry: registry,
jsonpbMarshaler: jsonpbMarshaler,
registry: registry,
}
}

Expand All @@ -39,21 +42,56 @@ func (m *messageRouterService) Send(logger *zap.Logger, ps []Presence, msg proto
return
}

payload, err := proto.Marshal(msg)
if err != nil {
logger.Error("Could not marshall message to byte[]", zap.Error(err))
return
// Group together target sessions by format.
jsonSessionIDs := make([]string, 0)
protobufSessionIDs := make([]string, 0)
for _, p := range ps {
switch p.Meta.Format {
case SessionFormatJson:
jsonSessionIDs = append(jsonSessionIDs, p.ID.SessionID)
default:
protobufSessionIDs = append(protobufSessionIDs, p.ID.SessionID)
}
}

for _, p := range ps {
session := m.registry.Get(p.ID.SessionID)
if session != nil {
// Encode and route together for Protobuf format.
if len(protobufSessionIDs) != 0 {
payload, err := proto.Marshal(msg)
if err != nil {
logger.Error("Could not marshall message to byte[]", zap.Error(err))
return
}
for _, sessionID := range protobufSessionIDs {
session := m.registry.Get(sessionID)
if session == nil {
logger.Warn("No session to route to", zap.Any("sid", sessionID))
continue
}
err := session.SendBytes(payload, reliable)
if err != nil {
logger.Error("Failed to route to", zap.Any("p", p), zap.Error(err))
logger.Error("Failed to route to", zap.Any("sid", sessionID), zap.Error(err))
}
}
}

// Encode and route together for JSON format.
if len(jsonSessionIDs) != 0 {
payload, err := m.jsonpbMarshaler.MarshalToString(msg)
if err != nil {
logger.Error("Could not marshall message to json", zap.Error(err))
return
}
payloadBytes := []byte(payload)
for _, sessionID := range jsonSessionIDs {
session := m.registry.Get(sessionID)
if session == nil {
logger.Warn("No session to route to", zap.Any("sid", sessionID))
continue
}
err := session.SendBytes(payloadBytes, reliable)
if err != nil {
logger.Error("Failed to route to", zap.Any("sid", sessionID), zap.Error(err))
}
} else {
logger.Warn("No session to route to", zap.Any("p", p))
}
}
}
1 change: 1 addition & 0 deletions server/pipeline_friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"encoding/json"
"fmt"

"github.com/lib/pq"
"go.uber.org/zap"
)
Expand Down
1 change: 1 addition & 0 deletions server/pipeline_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"

"fmt"

"github.com/lib/pq"
"go.uber.org/zap"
)
Expand Down
2 changes: 2 additions & 0 deletions server/pipeline_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (p *pipeline) matchCreate(logger *zap.Logger, session session, envelope *En

p.tracker.Track(session.ID(), "match:"+matchID, session.UserID(), PresenceMeta{
Handle: handle,
Format: session.Format(),
})

self := &UserPresence{
Expand Down Expand Up @@ -123,6 +124,7 @@ func (p *pipeline) matchJoin(logger *zap.Logger, session session, envelope *Enve

p.tracker.Track(session.ID(), topic, session.UserID(), PresenceMeta{
Handle: handle,
Format: session.Format(),
})

userPresences := make([]*UserPresence, len(ps)+1)
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline_matchmake.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *pipeline) matchmakeAdd(logger *zap.Logger, session session, envelope *E
}

matchmakerProfile := &MatchmakerProfile{
Meta: PresenceMeta{Handle: session.Handle()},
Meta: PresenceMeta{Handle: session.Handle(), Format: session.Format()},
RequiredCount: int(requiredCount),
Properties: properties,
Filters: filters,
Expand Down
3 changes: 2 additions & 1 deletion server/pipeline_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package server

import (
"strings"

"github.com/yuin/gopher-lua"
"go.uber.org/zap"
"strings"
)

func (p *pipeline) rpc(logger *zap.Logger, session session, envelope *Envelope) {
Expand Down
1 change: 1 addition & 0 deletions server/pipeline_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (p *pipeline) topicJoin(logger *zap.Logger, session session, envelope *Enve
// Track the presence, and gather current member list.
isNewPresence := p.tracker.Track(session.ID(), trackerTopic, session.UserID(), PresenceMeta{
Handle: handle,
Format: session.Format(),
})
presences := p.tracker.ListByTopic(trackerTopic)

Expand Down
9 changes: 8 additions & 1 deletion server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ package server

import "go.uber.org/zap"

type SessionFormat int

const (
SessionFormatProtobuf SessionFormat = 0
SessionFormatJson = 1
)

type session interface {
Logger() *zap.Logger
ID() string
Expand All @@ -26,10 +33,10 @@ type session interface {

Lang() string
Expiry() int64

Consume(func(logger *zap.Logger, session session, envelope *Envelope, reliable bool))
Unregister()

Format() SessionFormat
Send(envelope *Envelope, reliable bool) error
SendBytes(payload []byte, reliable bool) error

Expand Down
26 changes: 13 additions & 13 deletions server/session_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import (
"strings"
"time"

"nakama/pkg/httputil"
"nakama/pkg/multicode"
"nakama/pkg/social"

"encoding/base64"
"net"

"github.com/dgrijalva/jwt-go"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
Expand All @@ -44,8 +47,6 @@ import (
"github.com/yuin/gopher-lua"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
"nakama/pkg/httputil"
"net"
)

const (
Expand Down Expand Up @@ -86,7 +87,7 @@ type authenticationService struct {
}

// NewAuthenticationService creates a new AuthenticationService
func NewAuthenticationService(logger *zap.Logger, config Config, db *sql.DB, statService StatsService, registry *SessionRegistry, socialClient *social.Client, pipeline *pipeline, runtimePool *RuntimePool) *authenticationService {
func NewAuthenticationService(logger *zap.Logger, config Config, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, statService StatsService, registry *SessionRegistry, socialClient *social.Client, pipeline *pipeline, runtimePool *RuntimePool) *authenticationService {
a := &authenticationService{
logger: logger,
config: config,
Expand All @@ -107,15 +108,8 @@ func NewAuthenticationService(logger *zap.Logger, config Config, db *sql.DB, sta
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
},
jsonpbMarshaler: &jsonpb.Marshaler{
EnumsAsInts: true,
EmitDefaults: false,
Indent: "",
OrigName: false,
},
jsonpbUnmarshaler: &jsonpb.Unmarshaler{
AllowUnknownFields: false,
},
jsonpbMarshaler: jsonpbMarshaler,
jsonpbUnmarshaler: jsonpbUnmarshaler,
}

a.configure()
Expand Down Expand Up @@ -183,14 +177,20 @@ func (a *authenticationService) configure() {
lang = "en"
}

sformat := SessionFormatProtobuf
format := r.URL.Query().Get("format")
if format == "json" {
sformat = SessionFormatJson
}

conn, err := a.upgrader.Upgrade(w, r, nil)
if err != nil {
// http.Error is invoked automatically from within the Upgrade func
a.logger.Warn("Could not upgrade to WebSocket", zap.Error(err))
return
}

a.registry.addWS(uid, handle, lang, exp, conn, a.pipeline.processRequest)
a.registry.addWS(uid, handle, lang, sformat, exp, conn, a.jsonpbMarshaler, a.jsonpbUnmarshaler, a.pipeline.processRequest)
}).Methods("GET", "OPTIONS")

a.mux.HandleFunc("/runtime/{path}", func(w http.ResponseWriter, r *http.Request) {
Expand Down
Loading

0 comments on commit bcb1673

Please sign in to comment.