Skip to content

Commit

Permalink
*: Snapshot returns local etcd version
Browse files Browse the repository at this point in the history
Co-authored-by: Lili Cosic <[email protected]>
  • Loading branch information
serathius and lilic committed Jun 14, 2021
1 parent eca086e commit e1b1d93
Show file tree
Hide file tree
Showing 15 changed files with 797 additions and 506 deletions.
4 changes: 4 additions & 0 deletions Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,10 @@
"type": "string",
"format": "uint64",
"title": "remaining_bytes is the number of blob bytes to be sent after this message"
},
"version": {
"description": "local version of server that created the snapshot.\nIn cluster with binaries with different version, each cluster can return different result.\nInforms which etcd server version should be used when restoring the snapshot.",
"type": "string"
}
}
},
Expand Down
571 changes: 313 additions & 258 deletions api/etcdserverpb/rpc.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ message SnapshotResponse {

// blob contains the next chunk of the snapshot in the snapshot stream.
bytes blob = 3;

// local version of server that created the snapshot.
// In cluster with binaries with different version, each cluster can return different result.
// Informs which etcd server version should be used when restoring the snapshot.
string version = 4;
}

message WatchRequest {
Expand Down
103 changes: 88 additions & 15 deletions client/v3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,15 @@ type Maintenance interface {
// is non-zero, the hash is computed on all keys at or below the given revision.
HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)

// SnapshotWithVersion returns a reader for a point-in-time snapshot and version of etcd that created it.
// If the context "ctx" is canceled or timed out, reading from returned
// "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
SnapshotWithVersion(ctx context.Context) (*SnapshotResponse, error)

// Snapshot provides a reader for a point-in-time snapshot of etcd.
// If the context "ctx" is canceled or timed out, reading from returned
// "io.ReadCloser" would error out (e.g. context.Canceled, context.DeadlineExceeded).
// Deprecated: use SnapshotWithVersion instead.
Snapshot(ctx context.Context) (io.ReadCloser, error)

// MoveLeader requests current leader to transfer its leadership to the transferee.
Expand All @@ -78,6 +84,21 @@ type Maintenance interface {
Downgrade(ctx context.Context, action int32, version string) (*DowngradeResponse, error)
}

// SnapshotResponse is aggregated response from the snapshot stream.
// Consumer is responsible for closing steam by calling .Snapshot.Close()
type SnapshotResponse struct {
// Header is the first header in the snapshot stream, has the current key-value store information
// and indicates the point in time of the snapshot.
Header *pb.ResponseHeader
// Snapshot exposes ReaderCloser interface for data stored in the Blob field in the snapshot stream.
Snapshot io.ReadCloser
// Version is the local version of server that created the snapshot.
// In cluster with binaries with different version, each cluster can return different result.
// Informs which etcd server version should be used when restoring the snapshot.
// Supported on etcd >= v3.6.
Version string
}

type maintenance struct {
lg *zap.Logger
dial func(endpoint string) (pb.MaintenanceClient, func(), error)
Expand Down Expand Up @@ -213,40 +234,92 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*
return (*HashKVResponse)(resp), nil
}

func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
func (m *maintenance) SnapshotWithVersion(ctx context.Context) (*SnapshotResponse, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
}

m.lg.Info("opened snapshot stream; downloading")
pr, pw := io.Pipe()

resp, err := ss.Recv()
if err != nil {
m.logAndCloseWithError(err, pw)
}
go func() {
// Saving response is blocking
err = m.save(resp, pw)
if err != nil {
m.logAndCloseWithError(err, pw)
return
}
for {
resp, err := ss.Recv()
if err != nil {
switch err {
case io.EOF:
m.lg.Info("completed snapshot read; closing")
default:
m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
}
pw.CloseWithError(err)
m.logAndCloseWithError(err, pw)
return
}
err = m.save(resp, pw)
if err != nil {
m.logAndCloseWithError(err, pw)
return
}
}
}()
return &SnapshotResponse{
Header: resp.Header,
Snapshot: &snapshotReadCloser{ctx: ctx, ReadCloser: pr},
Version: resp.Version,
}, err
}

func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
if err != nil {
return nil, toErr(ctx, err)
}

// can "resp == nil && err == nil"
// before we receive snapshot SHA digest?
// No, server sends EOF with an empty response
// after it sends SHA digest at the end
m.lg.Info("opened snapshot stream; downloading")
pr, pw := io.Pipe()

if _, werr := pw.Write(resp.Blob); werr != nil {
pw.CloseWithError(werr)
go func() {
for {
resp, err := ss.Recv()
if err != nil {
m.logAndCloseWithError(err, pw)
return
}
err = m.save(resp, pw)
if err != nil {
m.logAndCloseWithError(err, pw)
return
}
}
}()
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, err
}

func (m *maintenance) logAndCloseWithError(err error, pw *io.PipeWriter) {
switch err {
case io.EOF:
m.lg.Info("completed snapshot read; closing")
default:
m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
}
pw.CloseWithError(err)
}

func (m *maintenance) save(resp *pb.SnapshotResponse, pw *io.PipeWriter) error {
// can "resp == nil && err == nil"
// before we receive snapshot SHA digest?
// No, server sends EOF with an empty response
// after it sends SHA digest at the end

if _, werr := pw.Write(resp.Blob); werr != nil {
return werr
}
return nil
}

type snapshotReadCloser struct {
Expand Down
47 changes: 31 additions & 16 deletions client/v3/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,25 @@ func hasChecksum(n int64) bool {
return (n % 512) == sha256.Size
}

// Save fetches snapshot from remote etcd server and saves data
// to target path. If the context "ctx" is canceled or timed out,
// SaveWithVersion fetches snapshot from remote etcd server, saves data
// to target path and returns server version. If the context "ctx" is canceled or timed out,
// snapshot save stream will error out (e.g. context.Canceled,
// context.DeadlineExceeded). Make sure to specify only one endpoint
// in client configuration. Snapshot API must be requested to a
// selected node, and saved snapshot is the point-in-time state of
// the selected node.
func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) error {
// Etcd <v3.6 will return "" as version.
func SaveWithVersion(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) (version string, err error) {
if lg == nil {
lg = zap.NewExample()
}
cfg.Logger = lg.Named("client")
if len(cfg.Endpoints) != 1 {
return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
return "", fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
}
cli, err := clientv3.New(cfg)
if err != nil {
return err
return "", err
}
defer cli.Close()

Expand All @@ -63,40 +64,54 @@ func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath strin
var f *os.File
f, err = os.OpenFile(partpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fileutil.PrivateFileMode)
if err != nil {
return fmt.Errorf("could not open %s (%v)", partpath, err)
return "", fmt.Errorf("could not open %s (%v)", partpath, err)
}
lg.Info("created temporary db file", zap.String("path", partpath))

now := time.Now()
var rd io.ReadCloser
rd, err = cli.Snapshot(ctx)
resp, err := cli.SnapshotWithVersion(ctx)
if err != nil {
return err
return resp.Version, err
}
defer resp.Snapshot.Close()
lg.Info("fetching snapshot", zap.String("endpoint", cfg.Endpoints[0]))
var size int64
size, err = io.Copy(f, rd)
size, err = io.Copy(f, resp.Snapshot)
if err != nil {
return err
return resp.Version, err
}
if !hasChecksum(size) {
return fmt.Errorf("sha256 checksum not found [bytes: %d]", size)
return resp.Version, fmt.Errorf("sha256 checksum not found [bytes: %d]", size)
}
if err = fileutil.Fsync(f); err != nil {
return err
return resp.Version, err
}
if err = f.Close(); err != nil {
return err
return resp.Version, err
}
lg.Info("fetched snapshot",
zap.String("endpoint", cfg.Endpoints[0]),
zap.String("size", humanize.Bytes(uint64(size))),
zap.String("took", humanize.Time(now)),
zap.String("etcd-version", version),
)

if err = os.Rename(partpath, dbPath); err != nil {
return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
return resp.Version, fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
}
lg.Info("saved", zap.String("path", dbPath))
return nil
return resp.Version, nil
}

// Save fetches snapshot from remote etcd server and saves data
// to target path. If the context "ctx" is canceled or timed out,
// snapshot save stream will error out (e.g. context.Canceled,
// context.DeadlineExceeded). Make sure to specify only one endpoint
// in client configuration. Snapshot API must be requested to a
// selected node, and saved snapshot is the point-in-time state of
// the selected node.
// Deprecated: Use SaveWithVersion instead.
func Save(ctx context.Context, lg *zap.Logger, cfg clientv3.Config, dbPath string) error {
_, err := SaveWithVersion(ctx, lg, cfg, dbPath)
return err
}
6 changes: 5 additions & 1 deletion etcdctl/ctlv3/command/snapshot_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,14 @@ func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
defer cancel()

path := args[0]
if err := snapshot.Save(ctx, lg, *cfg, path); err != nil {
version, err := snapshot.SaveWithVersion(ctx, lg, *cfg, path)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitInterrupted, err)
}
fmt.Printf("Snapshot saved at %s\n", path)
if version != "" {
fmt.Printf("Server version %s\n", version)
}
}

func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) {
Expand Down
10 changes: 5 additions & 5 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ import (

// Manager defines snapshot methods.
type Manager interface {
// Save fetches snapshot from remote etcd server and saves data
// to target path. If the context "ctx" is canceled or timed out,
// Save fetches snapshot from remote etcd server, saves data
// to target path and returns server version. If the context "ctx" is canceled or timed out,
// snapshot save stream will error out (e.g. context.Canceled,
// context.DeadlineExceeded). Make sure to specify only one endpoint
// in client configuration. Snapshot API must be requested to a
// selected node, and saved snapshot is the point-in-time state of
// the selected node.
Save(ctx context.Context, cfg clientv3.Config, dbPath string) error
Save(ctx context.Context, cfg clientv3.Config, dbPath string) (version string, err error)

// Status returns the snapshot file information.
Status(dbPath string) (Status, error)
Expand Down Expand Up @@ -96,8 +96,8 @@ func hasChecksum(n int64) bool {
}

// Save fetches snapshot from remote etcd server and saves data to target path.
func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
return snapshot.Save(ctx, s.lg, cfg, dbPath)
func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) (version string, err error) {
return snapshot.SaveWithVersion(ctx, s.lg, cfg, dbPath)
}

// Status is the snapshot file status.
Expand Down
5 changes: 4 additions & 1 deletion server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
ms.lg.Info("sending database snapshot to client",
zap.Int64("total-bytes", total),
zap.String("size", size),
zap.String("etcd-version", version.Version),
)
for total-sent > 0 {
// buffer just holds read bytes from stream
Expand All @@ -151,6 +152,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
resp := &pb.SnapshotResponse{
RemainingBytes: uint64(total - sent),
Blob: buf[:n],
Version: version.Version,
}
if err = srv.Send(resp); err != nil {
return togRPCError(err)
Expand All @@ -166,7 +168,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
zap.Int64("total-bytes", total),
zap.Int("checksum-size", len(sha)),
)
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha, Version: version.Version}
if err := srv.Send(hresp); err != nil {
return togRPCError(err)
}
Expand All @@ -175,6 +177,7 @@ func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance
zap.Int64("total-bytes", total),
zap.String("size", size),
zap.String("took", humanize.Time(start)),
zap.String("etcd-version", version.Version),
)
return nil
}
Expand Down
5 changes: 4 additions & 1 deletion tests/e2e/ctl_v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ func testIssue6361(t *testing.T, etcdutl bool) {
fpath := filepath.Join(t.TempDir(), "test.snapshot")

t.Log("etcdctl saving snapshot...")
if err = spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)); err != nil {
if err = spawnWithExpects(append(prefixArgs, "snapshot", "save", fpath),
fmt.Sprintf("Snapshot saved at %s", fpath),
"Server version 3.5.0",
); err != nil {
t.Fatal(err)
}

Expand Down
6 changes: 5 additions & 1 deletion tests/functional/rpcpb/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
)
now := time.Now()
mgr := snapshot.NewV3(lg)
if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil {
version, err := mgr.Save(context.Background(), *ccfg, m.SnapshotPath)
if err != nil {
return err
}
took := time.Since(now)
Expand All @@ -307,10 +308,12 @@ func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
SnapshotHash: int64(st.Hash),
SnapshotRevision: st.Revision,
Took: fmt.Sprintf("%v", took),
Version: version,
}
lg.Info(
"snapshot save END",
zap.String("member-name", m.SnapshotInfo.MemberName),
zap.String("member-version", m.SnapshotInfo.Version),
zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
zap.String("snapshot-path", m.SnapshotPath),
zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
Expand Down Expand Up @@ -357,6 +360,7 @@ func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
lg.Info(
"snapshot restore END",
zap.String("member-name", m.SnapshotInfo.MemberName),
zap.String("member-version", m.SnapshotInfo.Version),
zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
zap.String("snapshot-path", m.SnapshotPath),
zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
Expand Down
Loading

0 comments on commit e1b1d93

Please sign in to comment.