Skip to content

Commit

Permalink
Update changelog, improve sorting of storage ops.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Mar 25, 2022
1 parent c33b642 commit 1c34ae2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project are documented below.
The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org).

## [Unreleased]
### Changed
- Ensure storage write ops return acks in the same order as inputs.

### Fixed
- Fix data returned by StreamUserList in JS runtime.
- Allow passing lists of presences as match init parameters to Go runtime matches.
Expand Down
20 changes: 15 additions & 5 deletions server/core_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,19 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, au
}

func storageWriteObjects(ctx context.Context, logger *zap.Logger, tx *sql.Tx, authoritativeWrite bool, ops StorageOpWrites) ([]*api.StorageObjectAck, error) {
acks := make([]*api.StorageObjectAck, 0, ops.Len())

for _, op := range ops {
// Ensure writes are processed in a consistent order to avoid deadlocks from concurrent operations.
// Sorting done on a copy to ensure we don't modify the input, which may be re-used on transaction retries.
sortedOps := make(StorageOpWrites, 0, len(ops))
indexedOps := make(map[*StorageOpWrite]int, len(ops))
for i, op := range ops {
sortedOps = append(sortedOps, op)
indexedOps[op] = i
}
sort.Sort(sortedOps)

// Run operations in the sorted order.
acks := make([]*api.StorageObjectAck, ops.Len())
for _, op := range sortedOps {
ack, writeErr := storageWriteObject(ctx, logger, tx, authoritativeWrite, op.OwnerID, op.Object)
if writeErr != nil {
if writeErr == runtime.ErrStorageRejectedVersion || writeErr == runtime.ErrStorageRejectedPermission {
Expand All @@ -498,9 +508,9 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, tx *sql.Tx, au
logger.Debug("Error writing storage objects.", zap.Error(writeErr))
return nil, writeErr
}
acks = append(acks, ack)
// Acks are returned in the original order.
acks[indexedOps[op]] = ack
}

return acks, nil
}

Expand Down

0 comments on commit 1c34ae2

Please sign in to comment.