Skip to content

Commit

Permalink
Reliably release database resources before moving from one query to t…
Browse files Browse the repository at this point in the history
…he next.
  • Loading branch information
zyro committed Jan 5, 2019
1 parent f59989e commit e3f93b2
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
## [Unreleased]
### Fixed
- Set gateway timeout to match idle timeout value.
- Reliably release database resources before moving from one query to the next.

## [2.3.1] - 2019-01-04
### Added
Expand Down
4 changes: 3 additions & 1 deletion server/console_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ func (s *ConsoleServer) ListAccounts(ctx context.Context, in *empty.Empty) (*con
userIDs := make([]string, 0)
for rows.Next() {
var userID sql.NullString
if rows.Scan(&userID); err != nil {
if err = rows.Scan(&userID); err != nil {
rows.Close()
s.logger.Error("Could not list users.", zap.Error(err))
return nil, status.Error(codes.Internal, "An error occurred while trying to list users.")
}
userIDs = append(userIDs, userID.String)
}
rows.Close()

accounts := make([]*api.Account, 0)
for _, id := range userIDs {
Expand Down
31 changes: 16 additions & 15 deletions server/core_authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,20 +627,27 @@ func importFacebookFriends(ctx context.Context, logger *zap.Logger, db *sql.DB,
if err != nil {
return err
}
defer rows.Close()
var id string
query = "UPDATE users SET edge_count = edge_count - 1 WHERE id = $1"
statements := make([]string, 0)
params := make([]interface{}, 0)
for rows.Next() {
// Update edge count to reflect each removed friend.
var id string
err = rows.Scan(&id)
if err != nil {
rows.Close()
return err
}
result, err := tx.ExecContext(ctx, query, id)
params = append(params, id)
statements = append(statements, "$"+strconv.Itoa(len(params)))
}
rows.Close()

if len(statements) > 0 {
query = "UPDATE users SET edge_count = edge_count - 1 WHERE id IN (" + strings.Join(statements, ",") + ")"
result, err := tx.ExecContext(ctx, query, params...)
if err != nil {
return err
}
if rowsAffectedCount, _ := result.RowsAffected(); rowsAffectedCount != 1 {
if rowsAffectedCount, _ := result.RowsAffected(); rowsAffectedCount != int64(len(statements)) {
return errors.New("error updating edge count after friend reset")
}
}
Expand Down Expand Up @@ -685,15 +692,9 @@ func importFacebookFriends(ctx context.Context, logger *zap.Logger, db *sql.DB,
for _, friendID := range possibleFriendIDs {
position := time.Now().UTC().UnixNano()

var r *sql.Rows
r, err = tx.QueryContext(ctx, "SELECT state FROM user_edge WHERE source_id = $1 AND destination_id = $2 AND state = 3", userID, friendID)
if r.Next() {
// User has previously blocked this friend, skip it.
r.Close()
continue
}

if err != nil {
var state sql.NullInt64
err = tx.QueryRowContext(ctx, "SELECT state FROM user_edge WHERE source_id = $1 AND destination_id = $2 AND state = 3", userID, friendID).Scan(&state)
if err != nil && err != sql.ErrNoRows {
logger.Error("Error checking block status in Facebook friend import.", zap.Error(err))
continue
}
Expand Down
3 changes: 2 additions & 1 deletion server/core_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
logger.Error("Error listing channel messages", zap.Error(err))
return nil, err
}
defer rows.Close()

messages := make([]*api.ChannelMessage, 0, limit)
var nextCursor, prevCursor *channelMessageListCursor
Expand Down Expand Up @@ -152,6 +151,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:

err = rows.Scan(&dbId, &dbCode, &dbSenderId, &dbUsername, &dbContent, &dbCreateTime, &dbUpdateTime)
if err != nil {
rows.Close()
logger.Error("Error parsing listed channel messages", zap.Error(err))
return nil, err
}
Expand Down Expand Up @@ -182,6 +182,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
}
}
}
rows.Close()

if incomingCursor != nil && !incomingCursor.IsNext {
// If this was a previous page listing, flip the results to their normal order and swap the cursors.
Expand Down
6 changes: 3 additions & 3 deletions server/core_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ WHERE (id = $1) AND (disable_time = '1970-01-01 00:00:00')`
// Errors here will not cause the join operation to fail.
logger.Error("Error looking up group admins to notify of join request.", zap.Error(err))
} else {
defer rows.Close()

for rows.Next() {
var id string
if err = rows.Scan(&id); err != nil {
Expand All @@ -352,6 +350,7 @@ WHERE (id = $1) AND (disable_time = '1970-01-01 00:00:00')`
},
}
}
rows.Close()
}

if len(notifications) > 0 {
Expand Down Expand Up @@ -1264,7 +1263,6 @@ WHERE group_edge.destination_id = $1`
logger.Debug("Could not list groups for a user.", zap.Error(err), zap.String("user_id", userID.String()))
return err
}
defer rows.Close()

deleteGroupsAndRelationships := make([]uuid.UUID, 0)
deleteRelationships := make([]uuid.UUID, 0)
Expand All @@ -1276,6 +1274,7 @@ WHERE group_edge.destination_id = $1`
var userState sql.NullInt64

if err := rows.Scan(&id, &edgeCount, &userState); err != nil {
rows.Close()
logger.Error("Could not parse rows when listing groups for a user.", zap.Error(err), zap.String("user_id", userID.String()))
return err
}
Expand All @@ -1291,6 +1290,7 @@ WHERE group_edge.destination_id = $1`
deleteRelationships = append(deleteRelationships, groupID)
}
}
rows.Close()

countOtherSuperadminsQuery := "SELECT COUNT(source_id) FROM group_edge WHERE source_id = $1 AND destination_id != $2 AND state = 0"
for _, g := range checkForOtherSuperadmins {
Expand Down
13 changes: 8 additions & 5 deletions server/core_leaderboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func LeaderboardRecordsList(ctx context.Context, logger *zap.Logger, db *sql.DB,
logger.Error("Error listing leaderboard records", zap.Error(err))
return nil, err
}
defer rows.Close()

rank := int64(0)
if incomingCursor != nil {
Expand Down Expand Up @@ -155,6 +154,7 @@ func LeaderboardRecordsList(ctx context.Context, logger *zap.Logger, db *sql.DB,

err = rows.Scan(&dbOwnerId, &dbUsername, &dbScore, &dbSubscore, &dbNumScore, &dbMaxNumScore, &dbMetadata, &dbCreateTime, &dbUpdateTime)
if err != nil {
rows.Close()
logger.Error("Error parsing listed leaderboard records", zap.Error(err))
return nil, err
}
Expand Down Expand Up @@ -199,6 +199,7 @@ func LeaderboardRecordsList(ctx context.Context, logger *zap.Logger, db *sql.DB,
}
}
}
rows.Close()

if incomingCursor != nil && !incomingCursor.IsNext {
// If this was a previous page listing, flip the results to their normal order and swap the cursors.
Expand Down Expand Up @@ -242,7 +243,6 @@ func LeaderboardRecordsList(ctx context.Context, logger *zap.Logger, db *sql.DB,
logger.Error("Error reading leaderboard records", zap.Error(err))
return nil, err
}
defer rows.Close()

ownerRecords = make([]*api.LeaderboardRecord, 0, len(ownerIds))

Expand All @@ -258,6 +258,7 @@ func LeaderboardRecordsList(ctx context.Context, logger *zap.Logger, db *sql.DB,
for rows.Next() {
err = rows.Scan(&dbOwnerId, &dbUsername, &dbScore, &dbSubscore, &dbNumScore, &dbMaxNumScore, &dbMetadata, &dbCreateTime, &dbUpdateTime)
if err != nil {
rows.Close()
logger.Error("Error parsing read leaderboard records", zap.Error(err))
return nil, err
}
Expand All @@ -283,6 +284,7 @@ func LeaderboardRecordsList(ctx context.Context, logger *zap.Logger, db *sql.DB,

ownerRecords = append(ownerRecords, record)
}
rows.Close()
}

// Bulk fill in the ranks of any owner records requested.
Expand Down Expand Up @@ -443,7 +445,7 @@ func LeaderboardRecordReadAll(ctx context.Context, logger *zap.Logger, db *sql.D
logger.Error("Error reading all leaderboard records for user", zap.String("user_id", userID.String()), zap.Error(err))
return nil, err
}
defer rows.Close()
// rows.Close() called in parseLeaderboardRecords

return parseLeaderboardRecords(logger, rows)
}
Expand Down Expand Up @@ -547,7 +549,7 @@ func getLeaderboardRecordsHaystack(ctx context.Context, logger *zap.Logger, db *
logger.Error("Could not execute leaderboard records list query", zap.Error(err))
return nil, err
}
defer firstRows.Close()
// firstRows.Close() called in parseLeaderboardRecords

firstRecords, err := parseLeaderboardRecords(logger, firstRows)
if err != nil {
Expand Down Expand Up @@ -579,7 +581,7 @@ func getLeaderboardRecordsHaystack(ctx context.Context, logger *zap.Logger, db *
logger.Error("Could not execute leaderboard records list query", zap.Error(err))
return nil, err
}
defer secondRows.Close()
// secondRows.Close() called in parseLeaderboardRecords

secondRecords, err := parseLeaderboardRecords(logger, secondRows)
if err != nil {
Expand All @@ -601,6 +603,7 @@ func getLeaderboardRecordsHaystack(ctx context.Context, logger *zap.Logger, db *
}

func parseLeaderboardRecords(logger *zap.Logger, rows *sql.Rows) ([]*api.LeaderboardRecord, error) {
defer rows.Close()
records := make([]*api.LeaderboardRecord, 0, 10)

var dbLeaderboardId string
Expand Down
3 changes: 2 additions & 1 deletion server/core_notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ ORDER BY create_time ASC`+limitQuery, params...)
logger.Error("Could not retrieve notifications.", zap.Error(err))
return nil, err
}
defer rows.Close()

notifications := make([]*api.Notification, 0)
var lastCreateTime int64
for rows.Next() {
no := &api.Notification{Persistent: true, CreateTime: &timestamp.Timestamp{}}
var createTime pq.NullTime
if err := rows.Scan(&no.Id, &no.Subject, &no.Content, &no.Code, &no.SenderId, &createTime); err != nil {
rows.Close()
logger.Error("Could not scan notification from database.", zap.Error(err))
return nil, err
}
Expand All @@ -126,6 +126,7 @@ ORDER BY create_time ASC`+limitQuery, params...)
}
notifications = append(notifications, no)
}
rows.Close()

notificationList := &api.NotificationList{}
cursorBuf := new(bytes.Buffer)
Expand Down
8 changes: 5 additions & 3 deletions server/core_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ LIMIT $2`
return err
}
}
defer rows.Close()
// rows.Close() called in storageListObjects

objects, err = storageListObjects(rows, cursor)
if err != nil {
Expand Down Expand Up @@ -144,7 +144,7 @@ LIMIT $3`
return err
}
}
defer rows.Close()
// rows.Close() called in storageListObjects

objects, err = storageListObjects(rows, cursor)
if err != nil {
Expand Down Expand Up @@ -191,7 +191,7 @@ LIMIT $3`
return err
}
}
defer rows.Close()
// rows.Close() called in storageListObjects

objects, err = storageListObjects(rows, cursor)
if err != nil {
Expand Down Expand Up @@ -260,6 +260,7 @@ func storageListObjects(rows *sql.Rows, cursor string) (*api.StorageObjectList,
var updateTime pq.NullTime
var userID sql.NullString
if err := rows.Scan(&o.Collection, &o.Key, &userID, &o.Value, &o.Version, &o.PermissionRead, &o.PermissionWrite, &createTime, &updateTime); err != nil {
rows.Close()
return nil, err
}

Expand All @@ -269,6 +270,7 @@ func storageListObjects(rows *sql.Rows, cursor string) (*api.StorageObjectList,
o.UserId = userID.String
objects = append(objects, o)
}
rows.Close()

if rows.Err() != nil {
return nil, rows.Err()
Expand Down
3 changes: 2 additions & 1 deletion server/core_tournament.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ WHERE duration > 0 AND start_time >= $1 AND end_time <= $2 AND category >= $3 AN
logger.Error("Could not retrieve tournaments", zap.Error(err))
return nil, err
}
defer rows.Close()

records := make([]*api.Tournament, 0)
newCursor := &tournamentListCursor{}
Expand All @@ -247,6 +246,7 @@ WHERE duration > 0 AND start_time >= $1 AND end_time <= $2 AND category >= $3 AN
for rows.Next() {
tournament, err := parseTournament(rows, now)
if err != nil {
rows.Close()
logger.Error("Error parsing listed tournament records", zap.Error(err))
return nil, err
}
Expand All @@ -258,6 +258,7 @@ WHERE duration > 0 AND start_time >= $1 AND end_time <= $2 AND category >= $3 AN
newCursor.TournamentId = records[limit-1].Id
}
}
rows.Close()

tournamentList := &api.TournamentList{
Tournaments: records,
Expand Down
4 changes: 3 additions & 1 deletion server/leaderboard_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ FROM leaderboard`
l.logger.Error("Error loading leaderboard cache from database", zap.Error(err))
return err
}
defer rows.Close()

leaderboards := make(map[string]*Leaderboard)

Expand All @@ -180,6 +179,7 @@ FROM leaderboard`
err = rows.Scan(&id, &authoritative, &sortOrder, &operator, &resetSchedule, &metadata, &createTime,
&category, &description, &duration, &endTime, &joinRequired, &maxSize, &maxNumScore, &title, &startTime)
if err != nil {
rows.Close()
l.logger.Error("Error parsing leaderboard cache from database", zap.Error(err))
return err
}
Expand All @@ -205,6 +205,7 @@ FROM leaderboard`
if resetSchedule.Valid {
expr, err := cronexpr.Parse(resetSchedule.String)
if err != nil {
rows.Close()
l.logger.Error("Error parsing leaderboard reset schedule from database", zap.Error(err))
return err
}
Expand All @@ -217,6 +218,7 @@ FROM leaderboard`

leaderboards[id] = leaderboard
}
rows.Close()

l.Lock()
l.leaderboards = leaderboards
Expand Down

0 comments on commit e3f93b2

Please sign in to comment.