Skip to content

Commit

Permalink
br: support PiTR feature (pingcap#34409)
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau authored May 12, 2022
1 parent 0182b65 commit c6fe032
Show file tree
Hide file tree
Showing 75 changed files with 7,412 additions and 141 deletions.
2 changes: 1 addition & 1 deletion br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func newFullBackupCommand() *cobra.Command {
return runBackupCommand(command, task.FullBackupCmd)
},
}
task.DefineFilterFlags(command, acceptAllTables)
task.DefineFilterFlags(command, acceptAllTables, false)
return command
}

Expand Down
4 changes: 2 additions & 2 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func newBackupMetaValidateCommand() *cobra.Command {
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0, true)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Info.ID] = int64(tableID)
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func encodeBackupMetaCommand() *cobra.Command {
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return errors.Trace(err)
}
_, s, err := task.GetStorage(ctx, &cfg)
_, s, err := task.GetStorage(ctx, cfg.Storage, &cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 2 additions & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ func main() {
NewDebugCommand(),
NewBackupCommand(),
NewRestoreCommand(),
NewStreamCommand(),
)
// Ouputs cmd.Print to stdout.
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)

rootCmd.SetArgs(os.Args[1:])
Expand Down
23 changes: 22 additions & 1 deletion br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
return errors.Trace(err)
}

if task.IsStreamRestore(cmdName) {
if err := cfg.ParseStreamRestoreFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
}

ctx := GetDefaultContext()
if cfg.EnableOpenTracing {
var store *appdash.MemoryStore
Expand Down Expand Up @@ -83,6 +89,7 @@ func NewRestoreCommand() *cobra.Command {
newDBRestoreCommand(),
newTableRestoreCommand(),
newRawRestoreCommand(),
newStreamRestoreCommand(),
)
task.DefineRestoreFlags(command.PersistentFlags())

Expand All @@ -98,7 +105,7 @@ func newFullRestoreCommand() *cobra.Command {
return runRestoreCommand(cmd, task.FullRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables)
task.DefineFilterFlags(command, filterOutSysAndMemTables, false)
return command
}

Expand Down Expand Up @@ -141,3 +148,17 @@ func newRawRestoreCommand() *cobra.Command {
task.DefineRawRestoreFlags(command)
return command
}

func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "point",
Short: "restore data from log until specify commit timestamp",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runRestoreCommand(command, task.PointRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables, true)
task.DefineStreamRestoreFlags(command)
return command
}
190 changes: 190 additions & 0 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/trace"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/spf13/cobra"
"sourcegraph.com/sourcegraph/appdash"
)

// NewStreamCommand specifies adding several commands for backup log
func NewStreamCommand() *cobra.Command {
command := &cobra.Command{
Use: "log",
Short: "backup stream log from TiDB/TiKV cluster",
SilenceUsage: true,
PersistentPreRunE: func(c *cobra.Command, args []string) error {
if err := Init(c); err != nil {
return errors.Trace(err)
}
build.LogInfo(build.BR)
utils.LogEnvVariables()
task.LogArguments(c)
return nil
},
}

command.AddCommand(
newStreamStartCommand(),
newStreamStopCommand(),
newStreamPauseCommand(),
newStreamResumeCommand(),
newStreamStatusCommand(),
newStreamTruncateCommand(),
)
command.SetHelpFunc(func(command *cobra.Command, strings []string) {
task.HiddenFlagsForStream(command.Root().PersistentFlags())
command.Root().HelpFunc()(command, strings)
})

return command
}

func newStreamStartCommand() *cobra.Command {
command := &cobra.Command{
Use: "start",
Short: "start a log backup task",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamStart)
},
}

task.DefineFilterFlags(command, acceptAllTables, true)
task.DefineStreamStartFlags(command.Flags())
return command
}

func newStreamStopCommand() *cobra.Command {
command := &cobra.Command{
Use: "stop",
Short: "stop a log backup task",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamStop)
},
}

task.DefineStreamCommonFlags(command.Flags())
return command
}

//nolint:unused,deadcode
func newStreamPauseCommand() *cobra.Command {
command := &cobra.Command{
Use: "pause",
Short: "pause a log backup task",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamPause)
},
}

task.DefineStreamPauseFlags(command.Flags())
return command
}

//nolint:unused,deadcode
func newStreamResumeCommand() *cobra.Command {
command := &cobra.Command{
Use: "resume",
Short: "resume a log backup task",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamResume)
},
}

task.DefineStreamCommonFlags(command.Flags())
return command
}

func newStreamStatusCommand() *cobra.Command {
command := &cobra.Command{
Use: "status",
Short: "get status for the log backup task",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamStatus)
},
}

task.DefineStreamStatusCommonFlags(command.Flags())
return command
}

func newStreamTruncateCommand() *cobra.Command {
command := &cobra.Command{
Use: "truncate",
Short: "truncate the incremental log until sometime.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamTruncate)
},
}
task.DefineStreamTruncateLogFlags(command.Flags())
return command
}

func streamCommand(command *cobra.Command, cmdName string) error {
var cfg task.StreamConfig
var err error
defer func() {
if err != nil {
command.SilenceUsage = false
}
}()

cfg.Config = task.Config{LogProgress: HasLogFile()}
if err = cfg.Config.ParseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}

switch cmdName {
case task.StreamTruncate:
if err = cfg.ParseStreamTruncateFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamStatus:
if err = cfg.ParseStreamStatusFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamStart:
if err = cfg.ParseStreamStartFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamPause:
if err = cfg.ParseStreamPauseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
default:
if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
}
ctx := GetDefaultContext()
if cfg.EnableOpenTracing {
var store *appdash.MemoryStore
ctx, store = trace.TracerStartSpan(ctx)
defer trace.TracerFinishSpan(ctx, store)
}

return task.RunStreamCommand(ctx, tidbGlue, cmdName, &cfg)
}
37 changes: 34 additions & 3 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func BuildBackupRangeAndSchema(
}

ranges := make([]rtree.Range, 0)
backupSchemas := newBackupSchemas()
backupSchemas := NewBackupSchemas()
dbs, err := m.ListDatabases()
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand All @@ -330,7 +330,7 @@ func BuildBackupRangeAndSchema(

if len(tables) == 0 {
log.Info("backup empty database", zap.Stringer("db", dbInfo.Name))
backupSchemas.addSchema(dbInfo, nil)
backupSchemas.AddSchema(dbInfo, nil)
continue
}

Expand Down Expand Up @@ -399,7 +399,7 @@ func BuildBackupRangeAndSchema(
}
tableInfo.Indices = tableInfo.Indices[:n]

backupSchemas.addSchema(dbInfo, tableInfo)
backupSchemas.AddSchema(dbInfo, tableInfo)

tableRanges, err := BuildTableRanges(tableInfo)
if err != nil {
Expand All @@ -421,6 +421,37 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, policies, nil
}

// BuildFullSchema builds a full backup schemas for databases and tables.
func BuildFullSchema(storage kv.Storage, backupTS uint64) (*Schemas, error) {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)

newBackupSchemas := NewBackupSchemas()
dbs, err := m.ListDatabases()
if err != nil {
return nil, errors.Trace(err)
}

for _, db := range dbs {
tables, err := m.ListTables(db.ID)
if err != nil {
return nil, errors.Trace(err)
}

// backup this empty db if this schema is empty.
if len(tables) == 0 {
newBackupSchemas.AddSchema(db, nil)
}

for _, table := range tables {
// add table
newBackupSchemas.AddSchema(db, table)
}
}

return newBackupSchemas, nil
}

func skipUnsupportedDDLJob(job *model.Job) bool {
switch job.Type {
// TiDB V5.3.0 supports TableAttributes and TablePartitionAttributes.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestSkipUnsupportedDDLJob(t *testing.T) {
require.NoErrorf(t, err, "Error get ts: %s", err)

cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, &cipher)
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.cluster.Storage, lastTS, ts)
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ type Schemas struct {
schemas map[string]*schemaInfo
}

func newBackupSchemas() *Schemas {
func NewBackupSchemas() *Schemas {
return &Schemas{
schemas: make(map[string]*schemaInfo),
}
}

func (ss *Schemas) addSchema(
func (ss *Schemas) AddSchema(
dbInfo *model.DBInfo, tableInfo *model.TableInfo,
) {
if tableInfo == nil {
Expand Down Expand Up @@ -134,7 +134,9 @@ func (ss *Schemas) BackupSchemas(
if err := metaWriter.Send(s, op); err != nil {
return errors.Trace(err)
}
updateCh.Inc()
if updateCh != nil {
updateCh.Inc()
}
return nil
})
}
Expand Down
Loading

0 comments on commit c6fe032

Please sign in to comment.