@@ -577,11 +577,12 @@ SELECT collection, key, user_id, value, version, read, write, create_time, updat
577
577
578
578
func StorageWriteObjects (ctx context.Context , logger * zap.Logger , db * sql.DB , metrics Metrics , storageIndex StorageIndex , authoritativeWrite bool , ops StorageOpWrites ) (* api.StorageObjectAcks , codes.Code , error ) {
579
579
var acks []* api.StorageObjectAck
580
+ var sortedWrites StorageOpWrites
580
581
581
582
if err := ExecuteInTxPgx (ctx , db , func (tx pgx.Tx ) error {
582
583
// If the transaction is retried ensure we wipe any acks that may have been prepared by previous attempts.
583
584
var writeErr error
584
- acks , writeErr = storageWriteObjects (ctx , logger , metrics , tx , authoritativeWrite , ops )
585
+ sortedWrites , acks , writeErr = storageWriteObjects (ctx , logger , metrics , tx , authoritativeWrite , ops )
585
586
if writeErr != nil {
586
587
if writeErr == runtime .ErrStorageRejectedVersion || writeErr == runtime .ErrStorageRejectedPermission {
587
588
logger .Debug ("Error writing storage objects." , zap .Error (writeErr ))
@@ -601,27 +602,12 @@ func StorageWriteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, me
601
602
return nil , codes .Internal , err
602
603
}
603
604
604
- sw := make ([]* api.StorageObject , 0 , len (ops ))
605
- for i , o := range ops {
606
- sw = append (sw , & api.StorageObject {
607
- Collection : o .Object .Collection ,
608
- Key : o .Object .Key ,
609
- UserId : o .OwnerID ,
610
- Value : o .Object .Value ,
611
- Version : acks [i ].Version ,
612
- PermissionRead : o .Object .PermissionRead .GetValue (),
613
- PermissionWrite : o .Object .PermissionRead .GetValue (),
614
- CreateTime : acks [i ].CreateTime ,
615
- UpdateTime : acks [i ].UpdateTime ,
616
- })
617
- }
618
-
619
- storageIndex .Write (ctx , sw )
605
+ storageIndexWrite (ctx , storageIndex , sortedWrites , acks )
620
606
621
607
return & api.StorageObjectAcks {Acks : acks }, codes .OK , nil
622
608
}
623
609
624
- func storageWriteObjects (ctx context.Context , logger * zap.Logger , metrics Metrics , tx pgx.Tx , authoritativeWrite bool , ops StorageOpWrites ) ([]* api.StorageObjectAck , error ) {
610
+ func storageWriteObjects (ctx context.Context , logger * zap.Logger , metrics Metrics , tx pgx.Tx , authoritativeWrite bool , ops StorageOpWrites ) (StorageOpWrites , []* api.StorageObjectAck , error ) {
625
611
// Ensure writes are processed in a consistent order to avoid deadlocks from concurrent operations.
626
612
// Sorting done on a copy to ensure we don't modify the input, which may be re-used on transaction retries.
627
613
sortedOps := make (StorageOpWrites , 0 , len (ops ))
@@ -654,16 +640,16 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metric
654
640
if err != nil && errors .As (err , & pgErr ) {
655
641
if pgErr .Code == dbErrorUniqueViolation {
656
642
metrics .StorageWriteRejectCount (map [string ]string {"collection" : object .Collection , "reason" : "version" }, 1 )
657
- return nil , runtime .ErrStorageRejectedVersion
643
+ return nil , nil , runtime .ErrStorageRejectedVersion
658
644
}
659
- return nil , err
645
+ return nil , nil , err
660
646
} else if err == pgx .ErrNoRows {
661
647
// Not every case from storagePrepWriteObject can return NoRows, but those
662
648
// which do are always ErrStorageRejectedVersion
663
649
metrics .StorageWriteRejectCount (map [string ]string {"collection" : object .Collection , "reason" : "version" }, 1 )
664
- return nil , runtime .ErrStorageRejectedVersion
650
+ return nil , nil , runtime .ErrStorageRejectedVersion
665
651
} else if err != nil {
666
- return nil , err
652
+ return nil , nil , err
667
653
}
668
654
669
655
if ! isUpsert {
@@ -672,11 +658,11 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metric
672
658
if ! authoritativeWrite && resultWrite != 1 {
673
659
// - permission: non-authoritative write & original row write != 1
674
660
metrics .StorageWriteRejectCount (map [string ]string {"collection" : object .Collection , "reason" : "permission" }, 1 )
675
- return nil , runtime .ErrStorageRejectedPermission
661
+ return nil , nil , runtime .ErrStorageRejectedPermission
676
662
} else if object .Version != "" {
677
663
// - version mismatch
678
664
metrics .StorageWriteRejectCount (map [string ]string {"collection" : object .Collection , "reason" : "version" }, 1 )
679
- return nil , runtime .ErrStorageRejectedVersion
665
+ return nil , nil , runtime .ErrStorageRejectedVersion
680
666
}
681
667
}
682
668
@@ -691,7 +677,7 @@ func storageWriteObjects(ctx context.Context, logger *zap.Logger, metrics Metric
691
677
acks [indexedOps [op ]] = ack
692
678
}
693
679
694
- return acks , nil
680
+ return sortedOps , acks , nil
695
681
}
696
682
697
683
func storagePrepBatch (batch * pgx.Batch , authoritativeWrite bool , op * StorageOpWrite ) {
@@ -781,41 +767,10 @@ func storagePrepBatch(batch *pgx.Batch, authoritativeWrite bool, op *StorageOpWr
781
767
}
782
768
783
769
func StorageDeleteObjects (ctx context.Context , logger * zap.Logger , db * sql.DB , storageIndex StorageIndex , authoritativeDelete bool , ops StorageOpDeletes ) (codes.Code , error ) {
784
- // Ensure deletes are processed in a consistent order.
785
- sort .Sort (ops )
786
-
787
- if err := ExecuteInTx (ctx , db , func (tx * sql.Tx ) error {
788
- for _ , op := range ops {
789
- params := []interface {}{op .ObjectID .Collection , op .ObjectID .Key , op .OwnerID }
790
- var query string
791
- if authoritativeDelete {
792
- // Deleting from the runtime.
793
- query = "DELETE FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3"
794
- } else {
795
- // Direct client request to delete.
796
- query = "DELETE FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3 AND write > 0"
797
- }
798
- if op .ObjectID .GetVersion () != "" {
799
- // Conditional delete.
800
- params = append (params , op .ObjectID .Version )
801
- query += " AND version = $4"
802
- }
803
-
804
- result , err := tx .ExecContext (ctx , query , params ... )
805
- if err != nil {
806
- logger .Debug ("Could not delete storage object." , zap .Error (err ), zap .String ("query" , query ), zap .Any ("object_id" , op .ObjectID ))
807
- return err
808
- }
809
-
810
- if authoritativeDelete && op .ObjectID .GetVersion () == "" {
811
- // If it's an authoritative delete and there is no OCC, the only reason rows affected would be 0 is having
812
- // nothing to delete. In that case it's safe to assume the deletion was just a no-op and there's no need
813
- // to check anything further. Should apply something similar to non-authoritative deletes too.
814
- continue
815
- }
816
- if rowsAffected , _ := result .RowsAffected (); rowsAffected == 0 {
817
- return StatusError (codes .InvalidArgument , "Storage delete rejected." , errors .New ("Storage delete rejected - not found, version check failed, or permission denied." ))
818
- }
770
+ if err := ExecuteInTxPgx (ctx , db , func (tx pgx.Tx ) error {
771
+ deleteErr := storageDeleteObjects (ctx , logger , tx , authoritativeDelete , ops )
772
+ if deleteErr != nil {
773
+ return deleteErr
819
774
}
820
775
return nil
821
776
}); err != nil {
@@ -830,3 +785,62 @@ func StorageDeleteObjects(ctx context.Context, logger *zap.Logger, db *sql.DB, s
830
785
831
786
return codes .OK , nil
832
787
}
788
+
789
+ func storageDeleteObjects (ctx context.Context , logger * zap.Logger , tx pgx.Tx , authoritativeDelete bool , ops StorageOpDeletes ) error {
790
+ // Ensure deletes are processed in a consistent order.
791
+ sort .Sort (ops )
792
+
793
+ for _ , op := range ops {
794
+ params := []interface {}{op .ObjectID .Collection , op .ObjectID .Key , op .OwnerID }
795
+ var query string
796
+ if authoritativeDelete {
797
+ // Deleting from the runtime.
798
+ query = "DELETE FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3"
799
+ } else {
800
+ // Direct client request to delete.
801
+ query = "DELETE FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3 AND write > 0"
802
+ }
803
+ if op .ObjectID .GetVersion () != "" {
804
+ // Conditional delete.
805
+ params = append (params , op .ObjectID .Version )
806
+ query += " AND version = $4"
807
+ }
808
+
809
+ result , err := tx .Exec (ctx , query , params ... )
810
+ if err != nil {
811
+ logger .Debug ("Could not delete storage object." , zap .Error (err ), zap .String ("query" , query ), zap .Any ("object_id" , op .ObjectID ))
812
+ return err
813
+ }
814
+
815
+ if authoritativeDelete && op .ObjectID .GetVersion () == "" {
816
+ // If it's an authoritative delete and there is no OCC, the only reason rows affected would be 0 is having
817
+ // nothing to delete. In that case it's safe to assume the deletion was just a no-op and there's no need
818
+ // to check anything further. Should apply something similar to non-authoritative deletes too.
819
+ continue
820
+ }
821
+ if rowsAffected := result .RowsAffected (); rowsAffected == 0 {
822
+ return StatusError (codes .InvalidArgument , "Storage delete rejected." , errors .New ("Storage delete rejected - not found, version check failed, or permission denied." ))
823
+ }
824
+ }
825
+
826
+ return nil
827
+ }
828
+
829
+ func storageIndexWrite (ctx context.Context , storageIndex StorageIndex , ops StorageOpWrites , acks []* api.StorageObjectAck ) {
830
+ sw := make ([]* api.StorageObject , 0 , len (ops ))
831
+ for i , o := range ops {
832
+ sw = append (sw , & api.StorageObject {
833
+ Collection : o .Object .Collection ,
834
+ Key : o .Object .Key ,
835
+ UserId : o .OwnerID ,
836
+ Value : o .Object .Value ,
837
+ Version : acks [i ].Version ,
838
+ PermissionRead : o .Object .PermissionRead .GetValue (),
839
+ PermissionWrite : o .Object .PermissionRead .GetValue (),
840
+ CreateTime : acks [i ].CreateTime ,
841
+ UpdateTime : acks [i ].UpdateTime ,
842
+ })
843
+ }
844
+
845
+ storageIndex .Write (ctx , sw )
846
+ }
0 commit comments