Skip to content

Commit

Permalink
dml-in-clause: code refactoring addressing review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Apr 27, 2020
1 parent 01d2fb4 commit 30b588e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 41 deletions.
24 changes: 4 additions & 20 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,16 @@ func (del *Delete) execDeleteEqual(vcursor VCursor, bindVars map[string]*querypb
}

func (del *Delete) execDeleteIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
keys, err := del.Values[0].ResolveList(bindVars)
rss, queries, err := resolveMultiValueShards(vcursor, del.Keyspace, del.Query, bindVars, del.Values[0], del.Vindex)
if err != nil {
return nil, vterrors.Wrap(err, "execDeleteIn")
return nil, err
}
rss, err := resolveMultiShard(vcursor, del.Vindex, del.Keyspace, keys)
if err != nil {
return nil, vterrors.Wrap(err, "execDeleteIn")
}
queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: del.Query,
BindVariables: bindVars,
}
}

if del.OwnedVindexQuery != "" {
if err := del.deleteVindexEntries(vcursor, bindVars, rss); err != nil {
return nil, vterrors.Wrap(err, "execDeleteIn")
}
}
autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return result, vterrors.Aggregate(errs)
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) {
Expand All @@ -180,9 +166,7 @@ func (del *Delete) execDeleteByDestination(vcursor VCursor, bindVars map[string]
return nil, err
}
}
autocommit := (len(rss) == 1 || del.MultiShardAutocommit) && vcursor.AutocommitApproval()
res, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return res, vterrors.Aggregate(errs)
return execMultiShard(vcursor, rss, queries, del.MultiShardAutocommit)
}

// deleteVindexEntries performs an delete if table owns vindex.
Expand Down
28 changes: 28 additions & 0 deletions go/vt/vtgate/engine/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package engine
import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

Expand Down Expand Up @@ -96,3 +99,28 @@ var opcodeName = map[DMLOpcode]string{
func (op DMLOpcode) String() string {
return opcodeName[op]
}

func resolveMultiValueShards(vcursor VCursor, keyspace *vindexes.Keyspace, query string, bindVars map[string]*querypb.BindVariable, pv sqltypes.PlanValue, vindex vindexes.SingleColumn) ([]*srvtopo.ResolvedShard, []*querypb.BoundQuery, error) {
keys, err := pv.ResolveList(bindVars)
if err != nil {
return nil, nil, vterrors.Wrap(err, "execDeleteIn")
}
rss, err := resolveMultiShard(vcursor, vindex, keyspace, keys)
if err != nil {
return nil, nil, vterrors.Wrap(err, "execDeleteIn")
}
queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: query,
BindVariables: bindVars,
}
}
return rss, queries, nil
}

func execMultiShard(vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, multiShardAutoCommit bool) (*sqltypes.Result, error) {
autocommit := (len(rss) == 1 || multiShardAutoCommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return result, vterrors.Aggregate(errs)
}
25 changes: 4 additions & 21 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,30 +141,16 @@ func (upd *Update) execUpdateEqual(vcursor VCursor, bindVars map[string]*querypb
}

func (upd *Update) execUpdateIn(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
keys, err := upd.Values[0].ResolveList(bindVars)
rss, queries, err := resolveMultiValueShards(vcursor, upd.Keyspace, upd.Query, bindVars, upd.Values[0], upd.Vindex)
if err != nil {
return nil, vterrors.Wrap(err, "execUpdateIn")
return nil, err
}
rss, err := resolveMultiShard(vcursor, upd.Vindex, upd.Keyspace, keys)
if err != nil {
return nil, vterrors.Wrap(err, "execUpdateIn")
}
queries := make([]*querypb.BoundQuery, len(rss))
for i := range rss {
queries[i] = &querypb.BoundQuery{
Sql: upd.Query,
BindVariables: bindVars,
}
}

if len(upd.ChangedVindexValues) != 0 {
if err := upd.updateVindexEntries(vcursor, bindVars, rss); err != nil {
return nil, vterrors.Wrap(err, "execUpdateIn")
}
}
autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return result, vterrors.Aggregate(errs)
return execMultiShard(vcursor, rss, queries, upd.MultiShardAutocommit)
}

func (upd *Update) execUpdateByDestination(vcursor VCursor, bindVars map[string]*querypb.BindVariable, dest key.Destination) (*sqltypes.Result, error) {
Expand All @@ -187,10 +173,7 @@ func (upd *Update) execUpdateByDestination(vcursor VCursor, bindVars map[string]
return nil, vterrors.Wrap(err, "execUpdateByDestination")
}
}

autocommit := (len(rss) == 1 || upd.MultiShardAutocommit) && vcursor.AutocommitApproval()
result, errs := vcursor.ExecuteMultiShard(rss, queries, true /* rollbackOnError */, autocommit)
return result, vterrors.Aggregate(errs)
return execMultiShard(vcursor, rss, queries, upd.MultiShardAutocommit)
}

// updateVindexEntries performs an update when a vindex is being modified
Expand Down

0 comments on commit 30b588e

Please sign in to comment.