Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113285: workload/schemachange: implement ALTER TABLE ALTER PRIMARY KEY r=rafiss a=chrisseto

#### 193b04f workload/schemachange: implement ALTER TABLE ALTER PRIMARY KEY

This commit adds support for the `ALTER TABLE ... ALTER PRIMARY KEY` DDL
to the random schema change workload.

Epic: CRDB-19168
Informs: cockroachdb#59595

Release note: None

114598: sql/stats: Correctly cancel stats background context on quiesce. r=benbardin a=benbardin

Currently, this can get stuck while quiescing in `ensureAllTables()`, where a SQL query is issued and cannot be completed.

Fixes: cockroachdb#114296
Release note: None

Co-authored-by: Chris Seto <[email protected]>
Co-authored-by: Ben Bardin <[email protected]>
  • Loading branch information
3 people committed Nov 17, 2023
3 parents 8d026f9 + 193b04f + bf24b7c commit 7b5b03b
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 33 deletions.
12 changes: 5 additions & 7 deletions pkg/sql/stats/automatic_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ func (r *Refresher) SetDraining() {
func (r *Refresher) Start(
ctx context.Context, stopper *stop.Stopper, refreshInterval time.Duration,
) error {
bgCtx := r.AnnotateCtx(context.Background())
stoppingCtx, _ := stopper.WithCancelOnQuiesce(context.Background())
bgCtx := r.AnnotateCtx(stoppingCtx)
r.startedTasksWG.Add(1)
if err := stopper.RunAsyncTask(bgCtx, "refresher", func(ctx context.Context) {
defer r.startedTasksWG.Done()
Expand Down Expand Up @@ -472,7 +473,7 @@ func (r *Refresher) Start(
break
case <-r.drainAutoStats:
return
case <-stopper.ShouldQuiesce():
case <-ctx.Done():
return
}

Expand Down Expand Up @@ -517,9 +518,7 @@ func (r *Refresher) Start(
r.maybeRefreshStats(ctx, stopper, tableID, explicitSettings, rowsAffected, r.asOfTime)

select {
case <-stopper.ShouldQuiesce():
// Don't bother trying to refresh the remaining tables if we
// are shutting down.
case <-ctx.Done():
return
case <-r.drainAutoStats:
// Ditto.
Expand Down Expand Up @@ -554,8 +553,7 @@ func (r *Refresher) Start(
case <-r.drainAutoStats:
log.Infof(ctx, "draining auto stats refresher")
return
case <-stopper.ShouldQuiesce():
log.Info(ctx, "quiescing auto stats refresher")
case <-ctx.Done():
return
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/workload/schemachange/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/encoding",
"//pkg/util/randutil",
"//pkg/util/syncutil",
Expand Down
126 changes: 126 additions & 0 deletions pkg/workload/schemachange/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"sync/atomic"
"text/template"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -2398,6 +2400,130 @@ func (og *operationGenerator) setColumnType(ctx context.Context, tx pgx.Tx) (*op
return stmt, nil
}

func (og *operationGenerator) alterTableAlterPrimaryKey(
ctx context.Context, tx pgx.Tx,
) (*opStmt, error) {
type Column struct {
Name tree.Name
Nullable bool
Unique bool
}

rowToTableName := func(row pgx.CollectableRow) (*tree.UnresolvedName, error) {
var schema string
var name string
if err := row.Scan(&schema, &name); err != nil {
return nil, err
}
return tree.NewUnresolvedName(schema, name), nil
}

columnsFrom := func(table tree.NodeFormatter) ([]Column, error) {
query := With([]CTE{
{"stats", fmt.Sprintf(`SELECT * FROM [SHOW STATISTICS FOR TABLE %v]`, table)},
{"unique_columns", `SELECT column_names[1] AS name FROM stats WHERE row_count = distinct_count AND array_length(column_names, 1) = 1`},
}, fmt.Sprintf(`SELECT column_name, is_nullable, EXISTS(SELECT * FROM unique_columns WHERE name = column_name) FROM [SHOW COLUMNS FROM %v] WHERE NOT is_hidden`, table))

return Collect(ctx, og, tx, pgx.RowToStructByPos[Column], query)
}

ctes := []CTE{
{"tables", `SELECT * FROM [SHOW TABLES] WHERE type = 'table'`},
{"descriptors", descJSONQuery},
{"tables_undergoing_schema_changes", `SELECT id FROM descriptors WHERE descriptor ? 'table' AND json_array_length(descriptor->'table'->'mutations') > 0`},
}

tablesUndergoingSchemaChangesQuery := With(ctes, `SELECT schema_name, table_name FROM tables WHERE NOT EXISTS(SELECT * FROM tables_undergoing_schema_changes WHERE id = (schema_name || '.' || table_name)::regclass::oid)`)
tablesNotUndergoingSchemaChangesQuery := With(ctes, `SELECT schema_name, table_name FROM tables WHERE EXISTS(SELECT * FROM tables_undergoing_schema_changes WHERE id = (schema_name || '.' || table_name)::regclass::oid)`)

var table *tree.UnresolvedName
stmt, code, err := Generate[*tree.AlterTable](og.params.rng, og.produceError(), []GenerationCase{
// IF EXISTS should noop if the table doesn't exist.
{pgcode.SuccessfulCompletion, `ALTER TABLE IF EXISTS "NonExistentTable" ALTER PRIMARY KEY USING COLUMNS ("IrrelevantColumn")`},
// Targeting a table that doesn't exist should error out.
{pgcode.UndefinedTable, `ALTER TABLE "NonExistentTable" ALTER PRIMARY KEY USING COLUMNS ("IrrelevantColumn")`},
// Targeting a column that doesn't exist should error out.
{pgcode.InvalidSchemaDefinition, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ("NonExistentColumn")`},
// NonUniqueColumns can't be used as PKs.
{pgcode.UniqueViolation, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({NonUniqueColumns})`},
// NullableColumns can't be used as PKs.
{pgcode.InvalidSchemaDefinition, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({NullableColumns})`},
// Tables undergoing a schema change may not have their PK changed.
// TODO(chrisseto): This case doesn't cause errors as expected.
// {pgcode.Code{}, `ALTER TABLE {TableUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({UniqueNotNullableColumns})`},
// Successful cases.
{pgcode.SuccessfulCompletion, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({UniqueNotNullableColumns})`},
{pgcode.SuccessfulCompletion, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({UniqueNotNullableColumns}) USING HASH`},
// TODO(chrisseto): Add support for hash parameters and storage parameters.
}, template.FuncMap{
"TableNotUnderGoingSchemaChange": func() (*tree.UnresolvedName, error) {
tables, err := Collect(ctx, og, tx, rowToTableName, tablesNotUndergoingSchemaChangesQuery)
if err != nil {
return nil, err
}
table, err = PickOne(og.params.rng, tables)
return table, err
},
"TableUnderGoingSchemaChange": func() (*tree.UnresolvedName, error) {
tables, err := Collect(ctx, og, tx, rowToTableName, tablesUndergoingSchemaChangesQuery)
if err != nil {
return nil, err
}
table, err = PickOne(og.params.rng, tables)
return table, err
},
"NullableColumns": func() (Values, error) {
columns, err := columnsFrom(table)
if err != nil {
return nil, err
}

names := util.Map(util.Filter(columns, func(c Column) bool {
return c.Nullable
}), func(c Column) *tree.Name {
return &c.Name
})

return AsValues(PickAtLeast(og.params.rng, 1, names))
},
"NonUniqueColumns": func() (Values, error) {
columns, err := columnsFrom(table)
if err != nil {
return nil, err
}

names := util.Map(util.Filter(columns, func(c Column) bool {
return !c.Nullable && !c.Unique
}), func(c Column) *tree.Name {
return &c.Name
})

return AsValues(PickAtLeast(og.params.rng, 1, names))
},
"UniqueNotNullableColumns": func() (Values, error) {
columns, err := columnsFrom(table)
if err != nil {
return nil, err
}

names := util.Map(util.Filter(columns, func(c Column) bool {
return !c.Nullable && c.Unique
}), func(c Column) *tree.Name {
return &c.Name
})

return AsValues(PickAtLeast(og.params.rng, 1, names))
},
})
if err != nil {
return nil, err
}

return newOpStmt(stmt, codesWithConditions{
{code, true},
}), nil
}

func (og *operationGenerator) survive(ctx context.Context, tx pgx.Tx) (*opStmt, error) {
dbRegions, err := og.getDatabaseRegionNames(ctx, tx)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/workload/schemachange/optype.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ const (
alterTableAddConstraintForeignKey // ALTER TABLE <table> ADD CONSTRAINT <constraint> FOREIGN KEY (<column>) REFERENCES <table> (<column>)
alterTableAddConstraintUnique // ALTER TABLE <table> ADD CONSTRAINT <constraint> UNIQUE (<column>)
alterTableAlterColumnType // ALTER TABLE <table> ALTER [COLUMN] <column> [SET DATA] TYPE <type>
alterTableAlterPrimaryKey // ALTER TABLE <table> ALTER PRIMARY KEY USING COLUMNS (<columns>)
alterTableDropColumn // ALTER TABLE <table> DROP COLUMN <column>
alterTableDropColumnDefault // ALTER TABLE <table> ALTER [COLUMN] <column> DROP DEFAULT
alterTableDropConstraint // ALTER TABLE <table> DROP CONSTRAINT <constraint>
alterTableDropNotNull // ALTER TABLE <table> ALTER [COLUMN] <column> DROP NOT NULL
alterTableDropColumnDefault // ALTER TABLE <table> ALTER [COLUMN] <column> DROP DEFAULT
alterTableDropStored // ALTER TABLE <table> ALTER [COLUMN] <column> DROP STORED
alterTableLocality // ALTER TABLE <table> LOCALITY <locality>
alterTableRenameColumn // ALTER TABLE <table> RENAME [COLUMN] <column> TO <column>
Expand Down Expand Up @@ -151,7 +152,6 @@ const (
// alterSchemaOwner
// alterSchemaRename
// alterSequence
// alterTableAlterPrimaryKey
// alterTableInjectStats
// alterTableOwner
// alterTablePartitionByTable
Expand Down Expand Up @@ -216,6 +216,7 @@ var opFuncs = []func(*operationGenerator, context.Context, pgx.Tx) (*opStmt, err
alterTableAddConstraintForeignKey: (*operationGenerator).addForeignKeyConstraint,
alterTableAddConstraintUnique: (*operationGenerator).addUniqueConstraint,
alterTableAlterColumnType: (*operationGenerator).setColumnType,
alterTableAlterPrimaryKey: (*operationGenerator).alterTableAlterPrimaryKey,
alterTableDropColumn: (*operationGenerator).dropColumn,
alterTableDropColumnDefault: (*operationGenerator).dropColumnDefault,
alterTableDropConstraint: (*operationGenerator).dropConstraint,
Expand Down Expand Up @@ -282,6 +283,7 @@ var opWeights = []int{
renameView: 1,
alterTableSetColumnDefault: 1,
alterTableSetColumnNotNull: 1,
alterTableAlterPrimaryKey: 1,
alterTableAlterColumnType: 0, // Disabled and tracked with #66662.
alterDatabaseSurvivalGoal: 0, // Disabled and tracked with #83831
}
Expand Down
51 changes: 27 additions & 24 deletions pkg/workload/schemachange/optype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/workload/schemachange/query_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"github.com/jackc/pgx/v5"
)

const (
descJSONQuery = `SELECT id, crdb_internal.pb_to_json('desc', descriptor) AS descriptor FROM system.descriptor`
)

type CTE struct {
As string
Query string
Expand Down

0 comments on commit 7b5b03b

Please sign in to comment.