Skip to content

Commit

Permalink
Improve transaction retry behaviour. (heroiclabs#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Apr 22, 2018
1 parent 425c599 commit 2d466c6
Show file tree
Hide file tree
Showing 19 changed files with 1,373 additions and 72 deletions.
7 changes: 6 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@
[[constraint]]
name = "github.com/stretchr/testify"
version = "~1.2.1"

[[constraint]]
name = "github.com/cockroachdb/cockroach-go"
revision = "59c0560478b705bf9bd12f9252224a0fad7c87df"
26 changes: 18 additions & 8 deletions server/api_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package server

import (
"database/sql"
"strconv"
"strings"

"github.com/cockroachdb/cockroach-go/crdb"
"github.com/golang/protobuf/ptypes/empty"
"github.com/heroiclabs/nakama/api"
"github.com/lib/pq"
Expand Down Expand Up @@ -72,37 +72,47 @@ func (s *ApiServer) LinkDevice(ctx context.Context, in *api.AccountDevice) (*emp
return nil, status.Error(codes.InvalidArgument, "Device ID invalid, must be 10-128 bytes.")
}

fnErr := Transact(s.logger, s.db, func(tx *sql.Tx) error {
tx, err := s.db.Begin()
if err != nil {
s.logger.Error("Could not begin database transaction.", zap.Error(err))
return nil, status.Error(codes.Internal, "Error linking Device ID.")
}

err = crdb.ExecuteInTx(ctx, tx, func() error {
userID := ctx.Value(ctxUserIDKey{})

var dbDeviceIdLinkedUser int64
err := tx.QueryRow("SELECT COUNT(id) FROM user_device WHERE id = $1 AND user_id = $2 LIMIT 1", deviceID, userID).Scan(&dbDeviceIdLinkedUser)
if err != nil {
s.logger.Error("Cannot link device ID.", zap.Error(err), zap.Any("input", in))
return status.Error(codes.Internal, "Error linking Device ID.")
return err
}

if dbDeviceIdLinkedUser == 0 {
_, err = tx.Exec("INSERT INTO user_device (id, user_id) VALUES ($1, $2)", deviceID, userID)
if err != nil {
if e, ok := err.(*pq.Error); ok && e.Code == dbErrorUniqueViolation {
return status.Error(codes.AlreadyExists, "Device ID already in use.")
return StatusError(codes.AlreadyExists, "Device ID already in use.", err)
}
s.logger.Error("Cannot link device ID.", zap.Error(err), zap.Any("input", in))
return status.Error(codes.Internal, "Error linking Device ID.")
return err
}
}

_, err = tx.Exec("UPDATE users SET update_time = now() WHERE id = $1", userID)
if err != nil {
s.logger.Error("Cannot update users table while linking.", zap.Error(err), zap.Any("input", in))
return status.Error(codes.Internal, "Error linking Device ID.")
return err
}
return nil
})

if fnErr != nil {
return nil, fnErr
if err != nil {
if e, ok := err.(*statusError); ok {
return nil, e.Status()
}
s.logger.Error("Error in database transaction.", zap.Error(err))
return nil, status.Error(codes.Internal, "Error linking Device ID.")
}

return &empty.Empty{}, nil
Expand Down
26 changes: 18 additions & 8 deletions server/api_unlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package server

import (
"database/sql"
"strconv"
"strings"

"github.com/cockroachdb/cockroach-go/crdb"
"github.com/golang/protobuf/ptypes/empty"
"github.com/heroiclabs/nakama/api"
"go.uber.org/zap"
Expand Down Expand Up @@ -61,7 +61,13 @@ func (s *ApiServer) UnlinkDevice(ctx context.Context, in *api.AccountDevice) (*e
return nil, status.Error(codes.InvalidArgument, "A device ID must be supplied.")
}

fnErr := Transact(s.logger, s.db, func(tx *sql.Tx) error {
tx, err := s.db.Begin()
if err != nil {
s.logger.Error("Could not begin database transaction.", zap.Error(err))
return nil, status.Error(codes.Internal, "Could not unlink Device ID.")
}

err = crdb.ExecuteInTx(ctx, tx, func() error {
userID := ctx.Value(ctxUserIDKey{})

query := `DELETE FROM user_device WHERE id = $2 AND user_id = $1
Expand All @@ -77,26 +83,30 @@ AND (EXISTS (SELECT id FROM users WHERE id = $1 AND
res, err := tx.Exec(query, userID, in.Id)
if err != nil {
s.logger.Error("Could not unlink device ID.", zap.Error(err), zap.Any("input", in))
return status.Error(codes.Internal, "Could not unlink Device ID.")
return err
}
if count, _ := res.RowsAffected(); count == 0 {
return status.Error(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.")
return StatusError(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.", ErrRowsAffectedCount)
}

res, err = tx.Exec("UPDATE users SET update_time = now() WHERE id = $1", userID)
if err != nil {
s.logger.Error("Could not unlink device ID.", zap.Error(err), zap.Any("input", in))
return status.Error(codes.Internal, "Could not unlink Device ID.")
return err
}
if count, _ := res.RowsAffected(); count == 0 {
return status.Error(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.")
return StatusError(codes.PermissionDenied, "Cannot unlink last account identifier. Check profile exists and is not last link.", ErrRowsAffectedCount)
}

return nil
})

if fnErr != nil {
return nil, fnErr
if err != nil {
if e, ok := err.(*statusError); ok {
return nil, e.Status()
}
s.logger.Error("Error in database transaction.", zap.Error(err))
return nil, status.Error(codes.Internal, "Could not unlink device ID.")
}

return &empty.Empty{}, nil
Expand Down
39 changes: 29 additions & 10 deletions server/core_authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"errors"

"context"
"github.com/cockroachdb/cockroach-go/crdb"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/heroiclabs/nakama/api"
"github.com/heroiclabs/nakama/social"
Expand Down Expand Up @@ -142,7 +144,14 @@ func AuthenticateDevice(logger *zap.Logger, db *sql.DB, deviceID, username strin

// Create a new account.
userID := uuid.Must(uuid.NewV4()).String()
fnErr := Transact(logger, db, func(tx *sql.Tx) error {

tx, err := db.Begin()
if err != nil {
logger.Error("Could not begin database transaction.", zap.Error(err))
return "", "", false, status.Error(codes.Internal, "Error finding or creating user account.")
}

err = crdb.ExecuteInTx(context.Background(), tx, func() error {
query := `
INSERT INTO users (id, username, create_time, update_time)
SELECT $1 AS id,
Expand All @@ -159,35 +168,39 @@ WHERE NOT EXISTS
if err == sql.ErrNoRows {
// A concurrent write has inserted this device ID.
logger.Debug("Did not insert new user as device ID already exists.", zap.Error(err), zap.String("deviceID", deviceID), zap.String("username", username), zap.Bool("create", create))
return status.Error(codes.Internal, "Error finding or creating user account.")
return StatusError(codes.Internal, "Error finding or creating user account.", err)
} else if e, ok := err.(*pq.Error); ok && e.Code == dbErrorUniqueViolation && strings.Contains(e.Message, "users_username_key") {
return status.Error(codes.AlreadyExists, "Username is already in use.")
return StatusError(codes.AlreadyExists, "Username is already in use.", err)
}
logger.Error("Cannot find or create user with device ID.", zap.Error(err), zap.String("deviceID", deviceID), zap.String("username", username), zap.Bool("create", create))
return status.Error(codes.Internal, "Error finding or creating user account.")
return err
}

if rowsAffectedCount, _ := result.RowsAffected(); rowsAffectedCount != 1 {
logger.Error("Did not insert new user.", zap.Int64("rows_affected", rowsAffectedCount))
return status.Error(codes.Internal, "Error finding or creating user account.")
return StatusError(codes.Internal, "Error finding or creating user account.", ErrRowsAffectedCount)
}

query = "INSERT INTO user_device (id, user_id) VALUES ($1, $2)"
result, err = tx.Exec(query, deviceID, userID)
if err != nil {
logger.Error("Cannot add device ID.", zap.Error(err), zap.String("deviceID", deviceID), zap.String("username", username), zap.Bool("create", create))
return status.Error(codes.Internal, "Error finding or creating user account.")
return err
}

if rowsAffectedCount, _ := result.RowsAffected(); rowsAffectedCount != 1 {
logger.Error("Did not insert new user.", zap.Int64("rows_affected", rowsAffectedCount))
return status.Error(codes.Internal, "Error finding or creating user account.")
return StatusError(codes.Internal, "Error finding or creating user account.", ErrRowsAffectedCount)
}

return nil
})
if fnErr != nil {
return "", "", false, fnErr
if err != nil {
if e, ok := err.(*statusError); ok {
return "", "", false, e.Status()
}
logger.Error("Error in database transaction.", zap.Error(err))
return "", "", false, status.Error(codes.Internal, "Error finding or creating user account.")
}

return userID, username, true, nil
Expand Down Expand Up @@ -541,7 +554,13 @@ func importFacebookFriends(logger *zap.Logger, db *sql.DB, messageRouter Message
position := time.Now().UTC().Unix()
friendUserIDs := make([]uuid.UUID, 0)

err = Transact(logger, db, func(tx *sql.Tx) error {
tx, err := db.Begin()
if err != nil {
logger.Error("Could not begin database transaction.", zap.Error(err))
return status.Error(codes.Internal, "Error importing Facebook friends.")
}

err = crdb.ExecuteInTx(context.Background(), tx, func() error {
if reset {
// Reset all friends for the current user, replacing them entirely with their Facebook friends.
// Note: will NOT remove blocked users.
Expand Down
29 changes: 24 additions & 5 deletions server/core_friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"time"

"context"
"github.com/cockroachdb/cockroach-go/crdb"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/heroiclabs/nakama/api"
"github.com/lib/pq"
Expand Down Expand Up @@ -143,7 +145,14 @@ func AddFriends(logger *zap.Logger, db *sql.DB, messageRouter MessageRouter, use
}

notificationToSend := make(map[string]bool)
if err := Transact(logger, db, func(tx *sql.Tx) error {

tx, err := db.Begin()
if err != nil {
logger.Error("Could not begin database transaction.", zap.Error(err))
return err
}

if err = crdb.ExecuteInTx(context.Background(), tx, func() error {
for id := range uniqueFriendIDs {
isFriendAccept, addFriendErr := addFriend(logger, tx, userID, id)
if addFriendErr == nil {
Expand Down Expand Up @@ -275,16 +284,20 @@ func DeleteFriends(logger *zap.Logger, db *sql.DB, currentUser uuid.UUID, ids []
uniqueFriendIDs[fid] = struct{}{}
}

err := Transact(logger, db, func(tx *sql.Tx) error {
tx, err := db.Begin()
if err != nil {
logger.Error("Could not begin database transaction.", zap.Error(err))
return err
}

return crdb.ExecuteInTx(context.Background(), tx, func() error {
for id := range uniqueFriendIDs {
if deleteFriendErr := deleteFriend(logger, tx, currentUser, id); deleteFriendErr != nil {
return deleteFriendErr
}
}
return nil
})

return err
}

func deleteFriend(logger *zap.Logger, tx *sql.Tx, userID uuid.UUID, friendID string) error {
Expand Down Expand Up @@ -321,7 +334,13 @@ func BlockFriends(logger *zap.Logger, db *sql.DB, currentUser uuid.UUID, ids []s
uniqueFriendIDs[fid] = struct{}{}
}

return Transact(logger, db, func(tx *sql.Tx) error {
tx, err := db.Begin()
if err != nil {
logger.Error("Could not begin database transaction.", zap.Error(err))
return err
}

return crdb.ExecuteInTx(context.Background(), tx, func() error {
for id := range uniqueFriendIDs {
if blockFriendErr := blockFriend(logger, tx, currentUser, id); blockFriendErr != nil {
return blockFriendErr
Expand Down
34 changes: 20 additions & 14 deletions server/core_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"errors"
"fmt"

"context"
"github.com/cockroachdb/cockroach-go/crdb"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/heroiclabs/nakama/api"
"github.com/lib/pq"
Expand Down Expand Up @@ -342,10 +344,16 @@ WHERE
}

func StorageWriteObjects(logger *zap.Logger, db *sql.DB, authoritativeWrite bool, objects map[uuid.UUID][]*api.WriteStorageObject) (*api.StorageObjectAcks, codes.Code, error) {
returnCode := codes.OK
returnCode := codes.Internal
acks := &api.StorageObjectAcks{}

if err := Transact(logger, db, func(tx *sql.Tx) error {
tx, err := db.Begin()
if err != nil {
logger.Error("Could not begin database transaction.", zap.Error(err))
return nil, codes.Internal, err
}

if err := crdb.ExecuteInTx(context.Background(), tx, func() error {
for ownerID, userObjects := range objects {
for _, object := range userObjects {
ack, writeErr := storageWriteObject(logger, tx, authoritativeWrite, ownerID, object)
Expand All @@ -364,15 +372,10 @@ func StorageWriteObjects(logger *zap.Logger, db *sql.DB, authoritativeWrite bool
}
return nil
}); err != nil {
// in case it is a commit/rollback error
if _, ok := err.(pq.Error); ok {
return nil, codes.Internal, err
}

return nil, returnCode, err
}

return acks, returnCode, nil
return acks, codes.OK, nil
}

func storageWriteObject(logger *zap.Logger, tx *sql.Tx, authoritativeWrite bool, ownerID uuid.UUID, object *api.WriteStorageObject) (*api.StorageObjectAck, error) {
Expand Down Expand Up @@ -488,8 +491,15 @@ RETURNING collection, key, version`
}

func StorageDeleteObjects(logger *zap.Logger, db *sql.DB, authoritativeDelete bool, userObjectIDs map[uuid.UUID][]*api.DeleteStorageObjectId) (codes.Code, error) {
returnCode := codes.OK
if err := Transact(logger, db, func(tx *sql.Tx) error {
returnCode := codes.Internal

tx, err := db.Begin()
if err != nil {
logger.Error("Could not begin database transaction.", zap.Error(err))
return codes.Internal, err
}

if err = crdb.ExecuteInTx(context.Background(), tx, func() error {
for ownerID, objectIDs := range userObjectIDs {
for _, objectID := range objectIDs {
params := []interface{}{objectID.GetCollection(), objectID.GetKey(), ownerID}
Expand Down Expand Up @@ -519,10 +529,6 @@ func StorageDeleteObjects(logger *zap.Logger, db *sql.DB, authoritativeDelete bo
}
return nil
}); err != nil {
// in case it is a commit/rollback error
if _, ok := err.(pq.Error); ok {
return codes.Internal, err
}
return returnCode, err
}

Expand Down
Loading

0 comments on commit 2d466c6

Please sign in to comment.