Skip to content

Commit

Permalink
Wire up runtime scripts. (heroiclabs#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Apr 22, 2018
1 parent c948017 commit ccdb0fd
Show file tree
Hide file tree
Showing 455 changed files with 124,850 additions and 1,584 deletions.
130 changes: 116 additions & 14 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,14 @@
[[constraint]]
name = "github.com/gorilla/websocket"
revision = "292fd08b2560ad524ee37396253d71570339a821"

[[constraint]]
name = "github.com/yuin/gopher-lua"
revision = "7d7bc8747e3f614c5c587729a341fe7d8903cdb8"

[[constraint]]
name = "github.com/gorhill/cronexpr"
revision = "d520615e531a6bf3fb69406b9eba718261285ec8"

[[constraint]]
name = "golang.org/x/crypto"
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ func main() {
registry := server.NewSessionRegistry()
tracker := server.StartLocalTracker(jsonLogger, registry, jsonpbMarshaler, config.GetName())
router := server.NewLocalMessageRouter(registry, tracker, jsonpbMarshaler)
pipeline := server.NewPipeline(config, db, tracker, router, registry)
apiServer := server.StartApiServer(jsonLogger, db, config, tracker, registry, pipeline, jsonpbMarshaler, jsonpbUnmarshaler)
runtimePool, err := server.NewRuntimePool(jsonLogger, multiLogger, db, config, registry, tracker, router)
if err != nil {
multiLogger.Fatal("Failed initializing runtime modules", zap.Error(err))
}
pipeline := server.NewPipeline(config, db, registry, tracker, router, runtimePool)
apiServer := server.StartApiServer(jsonLogger, db, config, registry, tracker, pipeline, runtimePool, jsonpbMarshaler, jsonpbUnmarshaler)

// Respect OS stop signals.
c := make(chan os.Signal, 2)
Expand Down
6 changes: 4 additions & 2 deletions server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ type ApiServer struct {
logger *zap.Logger
db *sql.DB
config Config
runtimePool *RuntimePool
random *rand.Rand
grpcServer *grpc.Server
grpcGatewayServer *http.Server
}

func StartApiServer(logger *zap.Logger, db *sql.DB, config Config, tracker Tracker, registry *SessionRegistry, pipeline *pipeline, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler) *ApiServer {
func StartApiServer(logger *zap.Logger, db *sql.DB, config Config, registry *SessionRegistry, tracker Tracker, pipeline *pipeline, runtimePool *RuntimePool, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler) *ApiServer {
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(SecurityInterceptorFunc(logger, config)),
)
Expand All @@ -63,6 +64,7 @@ func StartApiServer(logger *zap.Logger, db *sql.DB, config Config, tracker Track
logger: logger,
db: db,
config: config,
runtimePool: runtimePool,
random: rand.New(rand.NewSource(time.Now().UnixNano())),
grpcServer: grpcServer,
}
Expand Down Expand Up @@ -94,7 +96,7 @@ func StartApiServer(logger *zap.Logger, db *sql.DB, config Config, tracker Track
CORSOrigins := handlers.AllowedOrigins([]string{"*"})

grpcGatewayRouter := mux.NewRouter()
grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, tracker, registry, jsonpbMarshaler, jsonpbUnmarshaler, pipeline.processRequest))
grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, registry, tracker, jsonpbMarshaler, jsonpbUnmarshaler, pipeline.processRequest))
grpcGatewayRouter.NewRoute().Handler(grpcGateway)

handlerWithCORS := handlers.CORS(CORSHeaders, CORSOrigins)(grpcGatewayRouter)
Expand Down
4 changes: 4 additions & 0 deletions server/api_authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (s *ApiServer) AuthenticateSteamFunc(ctx context.Context, in *api.Authentic

func generateToken(config Config, userID, username string) string {
exp := time.Now().UTC().Add(time.Duration(config.GetSession().TokenExpiryMs) * time.Millisecond).Unix()
return generateTokenWithExpiry(config, userID, username, exp)
}

func generateTokenWithExpiry(config Config, userID, username string, exp int64) string {
token := jwt.NewWithClaims(jwt.SigningMethodHS256, jwt.MapClaims{
"uid": userID,
"exp": exp,
Expand Down
60 changes: 58 additions & 2 deletions server/api_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,65 @@ import (
"golang.org/x/net/context"
"github.com/heroiclabs/nakama/api"
"go.uber.org/zap"
"strings"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/yuin/gopher-lua"
"github.com/satori/go.uuid"
"github.com/golang/protobuf/ptypes/wrappers"
)

func (s *ApiServer) RpcFunc(ctx context.Context, in *api.Rpc) (*api.Rpc, error) {
s.logger.Info("RPC called", zap.Any("userID", ctx.Value(ctxUserIDKey{})), zap.Any("username", ctx.Value(ctxUsernameKey{})), zap.Any("expiry", ctx.Value(ctxExpiryKey{})))
return &api.Rpc{Id: in.Id, Payload: in.Payload, HttpKey: in.HttpKey}, nil
if in.Id == "" {
return nil, status.Error(codes.InvalidArgument, "RPC ID must be set")
}

id := strings.ToLower(in.Id)

if !s.runtimePool.HasRPC(id) {
return nil, status.Error(codes.NotFound, "RPC function not found")
}

runtime := s.runtimePool.Get()
lf := runtime.GetRuntimeCallback(RPC, id)
if lf == nil {
s.runtimePool.Put(runtime)
return nil, status.Error(codes.NotFound, "RPC function not found")
}

uid := ""
username := ""
expiry := int64(0)
if u := ctx.Value(ctxUserIDKey{}); u != nil {
uid = u.(uuid.UUID).String()
}
if u := ctx.Value(ctxUsernameKey{}); u != nil {
username = u.(string)
}
if e := ctx.Value(ctxExpiryKey{}); e != nil {
expiry = e.(int64)
}

result, fnErr := runtime.InvokeFunctionRPC(lf, uid, username, expiry, "", in.Payload.Value)
s.runtimePool.Put(runtime)
if fnErr != nil {
s.logger.Error("Runtime RPC function caused an error", zap.String("id", in.Id), zap.Error(fnErr))
if apiErr, ok := fnErr.(*lua.ApiError); ok && !s.config.GetLog().Verbose {
msg := apiErr.Object.String()
if strings.HasPrefix(msg, lf.Proto.SourceName) {
msg = msg[len(lf.Proto.SourceName):]
msgParts := strings.SplitN(msg, ": ", 2)
if len(msgParts) == 2 {
msg = msgParts[1]
} else {
msg = msgParts[0]
}
}
return nil, status.Error(codes.Aborted, msg)
} else {
return nil, status.Error(codes.Aborted, fnErr.Error())
}
}

return &api.Rpc{Payload: &wrappers.StringValue{Value: result}}, nil
}
36 changes: 23 additions & 13 deletions server/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,34 @@ import (
)

type pipeline struct {
config Config
db *sql.DB
tracker Tracker
router MessageRouter
registry *SessionRegistry
config Config
db *sql.DB
registry *SessionRegistry
tracker Tracker
router MessageRouter
runtimePool *RuntimePool
}

func NewPipeline(config Config, db *sql.DB, tracker Tracker, router MessageRouter, registry *SessionRegistry) *pipeline {
func NewPipeline(config Config, db *sql.DB, registry *SessionRegistry, tracker Tracker, router MessageRouter, runtimePool *RuntimePool) *pipeline {
return &pipeline{
config: config,
db: db,
tracker: tracker,
router: router,
registry: registry,
config: config,
db: db,
registry: registry,
tracker: tracker,
router: router,
runtimePool: runtimePool,
}
}

func (p *pipeline) processRequest(logger *zap.Logger, session session, envelope *rtapi.Envelope) {
// FIXME test by echoing back message.
session.Send(envelope)
switch envelope.Message.(type) {
case *rtapi.Envelope_Rpc:
p.rpc(logger, session, envelope)
default:
session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
Code: int32(rtapi.Error_UNRECOGNIZED_PAYLOAD),
Message: "Unrecognized payload",
}}})
return
}
}
Loading

0 comments on commit ccdb0fd

Please sign in to comment.