diff --git a/pkg/sql/stats/automatic_stats.go b/pkg/sql/stats/automatic_stats.go index 5256777fb5ae..0049a256f938 100644 --- a/pkg/sql/stats/automatic_stats.go +++ b/pkg/sql/stats/automatic_stats.go @@ -406,7 +406,8 @@ func (r *Refresher) SetDraining() { func (r *Refresher) Start( ctx context.Context, stopper *stop.Stopper, refreshInterval time.Duration, ) error { - bgCtx := r.AnnotateCtx(context.Background()) + stoppingCtx, _ := stopper.WithCancelOnQuiesce(context.Background()) + bgCtx := r.AnnotateCtx(stoppingCtx) r.startedTasksWG.Add(1) if err := stopper.RunAsyncTask(bgCtx, "refresher", func(ctx context.Context) { defer r.startedTasksWG.Done() @@ -472,7 +473,7 @@ func (r *Refresher) Start( break case <-r.drainAutoStats: return - case <-stopper.ShouldQuiesce(): + case <-ctx.Done(): return } @@ -517,9 +518,7 @@ func (r *Refresher) Start( r.maybeRefreshStats(ctx, stopper, tableID, explicitSettings, rowsAffected, r.asOfTime) select { - case <-stopper.ShouldQuiesce(): - // Don't bother trying to refresh the remaining tables if we - // are shutting down. + case <-ctx.Done(): return case <-r.drainAutoStats: // Ditto. @@ -554,8 +553,7 @@ func (r *Refresher) Start( case <-r.drainAutoStats: log.Infof(ctx, "draining auto stats refresher") return - case <-stopper.ShouldQuiesce(): - log.Info(ctx, "quiescing auto stats refresher") + case <-ctx.Done(): return } } diff --git a/pkg/workload/schemachange/BUILD.bazel b/pkg/workload/schemachange/BUILD.bazel index d18f0b94cd88..2fa3b8dec578 100644 --- a/pkg/workload/schemachange/BUILD.bazel +++ b/pkg/workload/schemachange/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/sql/sem/catconstants", "//pkg/sql/sem/tree", "//pkg/sql/types", + "//pkg/util", "//pkg/util/encoding", "//pkg/util/randutil", "//pkg/util/syncutil", diff --git a/pkg/workload/schemachange/operation_generator.go b/pkg/workload/schemachange/operation_generator.go index a46834244841..ba072a692542 100644 --- a/pkg/workload/schemachange/operation_generator.go +++ b/pkg/workload/schemachange/operation_generator.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync/atomic" + "text/template" "time" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -2398,6 +2400,130 @@ func (og *operationGenerator) setColumnType(ctx context.Context, tx pgx.Tx) (*op return stmt, nil } +func (og *operationGenerator) alterTableAlterPrimaryKey( + ctx context.Context, tx pgx.Tx, +) (*opStmt, error) { + type Column struct { + Name tree.Name + Nullable bool + Unique bool + } + + rowToTableName := func(row pgx.CollectableRow) (*tree.UnresolvedName, error) { + var schema string + var name string + if err := row.Scan(&schema, &name); err != nil { + return nil, err + } + return tree.NewUnresolvedName(schema, name), nil + } + + columnsFrom := func(table tree.NodeFormatter) ([]Column, error) { + query := With([]CTE{ + {"stats", fmt.Sprintf(`SELECT * FROM [SHOW STATISTICS FOR TABLE %v]`, table)}, + {"unique_columns", `SELECT column_names[1] AS name FROM stats WHERE row_count = distinct_count AND array_length(column_names, 1) = 1`}, + }, fmt.Sprintf(`SELECT column_name, is_nullable, EXISTS(SELECT * FROM unique_columns WHERE name = column_name) FROM [SHOW COLUMNS FROM %v] WHERE NOT is_hidden`, table)) + + return Collect(ctx, og, tx, pgx.RowToStructByPos[Column], query) + } + + ctes := []CTE{ + {"tables", `SELECT * FROM [SHOW TABLES] WHERE type = 'table'`}, + {"descriptors", descJSONQuery}, + {"tables_undergoing_schema_changes", `SELECT id FROM descriptors WHERE descriptor ? 'table' AND json_array_length(descriptor->'table'->'mutations') > 0`}, + } + + tablesUndergoingSchemaChangesQuery := With(ctes, `SELECT schema_name, table_name FROM tables WHERE NOT EXISTS(SELECT * FROM tables_undergoing_schema_changes WHERE id = (schema_name || '.' || table_name)::regclass::oid)`) + tablesNotUndergoingSchemaChangesQuery := With(ctes, `SELECT schema_name, table_name FROM tables WHERE EXISTS(SELECT * FROM tables_undergoing_schema_changes WHERE id = (schema_name || '.' || table_name)::regclass::oid)`) + + var table *tree.UnresolvedName + stmt, code, err := Generate[*tree.AlterTable](og.params.rng, og.produceError(), []GenerationCase{ + // IF EXISTS should noop if the table doesn't exist. + {pgcode.SuccessfulCompletion, `ALTER TABLE IF EXISTS "NonExistentTable" ALTER PRIMARY KEY USING COLUMNS ("IrrelevantColumn")`}, + // Targeting a table that doesn't exist should error out. + {pgcode.UndefinedTable, `ALTER TABLE "NonExistentTable" ALTER PRIMARY KEY USING COLUMNS ("IrrelevantColumn")`}, + // Targeting a column that doesn't exist should error out. + {pgcode.InvalidSchemaDefinition, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ("NonExistentColumn")`}, + // NonUniqueColumns can't be used as PKs. + {pgcode.UniqueViolation, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({NonUniqueColumns})`}, + // NullableColumns can't be used as PKs. + {pgcode.InvalidSchemaDefinition, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({NullableColumns})`}, + // Tables undergoing a schema change may not have their PK changed. + // TODO(chrisseto): This case doesn't cause errors as expected. + // {pgcode.Code{}, `ALTER TABLE {TableUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({UniqueNotNullableColumns})`}, + // Successful cases. + {pgcode.SuccessfulCompletion, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({UniqueNotNullableColumns})`}, + {pgcode.SuccessfulCompletion, `ALTER TABLE {TableNotUnderGoingSchemaChange} ALTER PRIMARY KEY USING COLUMNS ({UniqueNotNullableColumns}) USING HASH`}, + // TODO(chrisseto): Add support for hash parameters and storage parameters. + }, template.FuncMap{ + "TableNotUnderGoingSchemaChange": func() (*tree.UnresolvedName, error) { + tables, err := Collect(ctx, og, tx, rowToTableName, tablesNotUndergoingSchemaChangesQuery) + if err != nil { + return nil, err + } + table, err = PickOne(og.params.rng, tables) + return table, err + }, + "TableUnderGoingSchemaChange": func() (*tree.UnresolvedName, error) { + tables, err := Collect(ctx, og, tx, rowToTableName, tablesUndergoingSchemaChangesQuery) + if err != nil { + return nil, err + } + table, err = PickOne(og.params.rng, tables) + return table, err + }, + "NullableColumns": func() (Values, error) { + columns, err := columnsFrom(table) + if err != nil { + return nil, err + } + + names := util.Map(util.Filter(columns, func(c Column) bool { + return c.Nullable + }), func(c Column) *tree.Name { + return &c.Name + }) + + return AsValues(PickAtLeast(og.params.rng, 1, names)) + }, + "NonUniqueColumns": func() (Values, error) { + columns, err := columnsFrom(table) + if err != nil { + return nil, err + } + + names := util.Map(util.Filter(columns, func(c Column) bool { + return !c.Nullable && !c.Unique + }), func(c Column) *tree.Name { + return &c.Name + }) + + return AsValues(PickAtLeast(og.params.rng, 1, names)) + }, + "UniqueNotNullableColumns": func() (Values, error) { + columns, err := columnsFrom(table) + if err != nil { + return nil, err + } + + names := util.Map(util.Filter(columns, func(c Column) bool { + return !c.Nullable && c.Unique + }), func(c Column) *tree.Name { + return &c.Name + }) + + return AsValues(PickAtLeast(og.params.rng, 1, names)) + }, + }) + if err != nil { + return nil, err + } + + return newOpStmt(stmt, codesWithConditions{ + {code, true}, + }), nil +} + func (og *operationGenerator) survive(ctx context.Context, tx pgx.Tx) (*opStmt, error) { dbRegions, err := og.getDatabaseRegionNames(ctx, tx) if err != nil { diff --git a/pkg/workload/schemachange/optype.go b/pkg/workload/schemachange/optype.go index 800b389e96fa..84ce8187a461 100644 --- a/pkg/workload/schemachange/optype.go +++ b/pkg/workload/schemachange/optype.go @@ -94,10 +94,11 @@ const ( alterTableAddConstraintForeignKey // ALTER TABLE