Skip to content

Commit

Permalink
Improve OCC applied to high contention storage writes.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Oct 16, 2019
1 parent f67ce46 commit 14a3d94
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ All notable changes to this project are documented below.
The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org).

## [Unreleased]
### Added
- Custom events API for client and runtime events.

### Changed
- Default runtime HTTP key value is no longer the same as the default Server key value.
- Group create now returns HTTP 409 Conflict/GRPC Code 6 when group name is already in use.

### Fixed
- Correctly handle errors when concurrently writing new storage objects.
- Correctly apply optimistic concurrency controls to individual storage objects under high write contention.

## [2.7.0] - 2019-09-11
### Added
- Enable RPC functions to receive and return raw JSON data.
Expand Down
34 changes: 27 additions & 7 deletions server/core_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/gob"
"errors"
"fmt"
"github.com/jackc/pgx"
"sort"

"context"
Expand Down Expand Up @@ -555,23 +556,42 @@ func storageWriteObject(ctx context.Context, logger *zap.Logger, tx *sql.Tx, aut
return ack, nil
}

params := []interface{}{object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite}
var query string
if dbVersion.Valid {
// Updating an existing storage object.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID"
} else {
// Inserting a new storage object.
switch {
case object.Version != "" && object.Version != "*":
// OCC if match.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"
params = append(params, object.Version)
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
case dbVersion.Valid && object.Version != "*":
// An existing storage object was present, but no OCC if-not-exists required.
query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"
params = append(params, dbVersion.String)
// Respect permissions in non-authoritative writes.
if !authoritativeWrite {
query += " AND write = 1"
}
default:
// OCC if-not-exists, and all other non-OCC cases.
query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())"
// Existing permission checks are not applicable for new storage objects.
}

res, err := tx.ExecContext(ctx, query, object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite)
res, err := tx.ExecContext(ctx, query, params...)
if err != nil {
logger.Debug("Could not write storage object, exec error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
if e, ok := err.(pgx.PgError); ok && e.Code == dbErrorUniqueViolation {
return nil, ErrStorageRejectedVersion
}
return nil, err
}
if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
logger.Debug("Could not write storage object, rowsAffected error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
return nil, ErrStorageWriteFailed
return nil, ErrStorageRejectedVersion
}

ack := &api.StorageObjectAck{
Expand Down

0 comments on commit 14a3d94

Please sign in to comment.