forked from HDT3213/godis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrewrite.go
148 lines (132 loc) · 3.65 KB
/
rewrite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package aof
import (
"io"
"os"
"strconv"
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
)
func (persister *Persister) newRewriteHandler() *Persister {
h := &Persister{}
h.aofFilename = persister.aofFilename
h.db = persister.tmpDBMaker()
return h
}
// RewriteCtx holds context of an AOF rewriting procedure
type RewriteCtx struct {
tmpFile *os.File // tmpFile is the file handler of aof tmpFile
fileSize int64
dbIdx int // selected db index when startRewrite
}
// Rewrite carries out AOF rewrite
func (persister *Persister) Rewrite() error {
ctx, err := persister.StartRewrite()
if err != nil {
return err
}
err = persister.DoRewrite(ctx)
if err != nil {
return err
}
persister.FinishRewrite(ctx)
return nil
}
// DoRewrite actually rewrite aof file
// makes DoRewrite public for testing only, please use Rewrite instead
func (persister *Persister) DoRewrite(ctx *RewriteCtx) (err error) {
// start rewrite
if !config.Properties.AofUseRdbPreamble {
logger.Info("generate aof preamble")
err = persister.generateAof(ctx)
} else {
logger.Info("generate rdb preamble")
err = persister.generateRDB(ctx)
}
return err
}
// StartRewrite prepares rewrite procedure
func (persister *Persister) StartRewrite() (*RewriteCtx, error) {
// pausing aof
persister.pausingAof.Lock()
defer persister.pausingAof.Unlock()
err := persister.aofFile.Sync()
if err != nil {
logger.Warn("fsync failed")
return nil, err
}
// get current aof file size
fileInfo, _ := os.Stat(persister.aofFilename)
filesize := fileInfo.Size()
// create tmp file
file, err := os.CreateTemp(config.GetTmpDir(), "*.aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return &RewriteCtx{
tmpFile: file,
fileSize: filesize,
dbIdx: persister.currentDB,
}, nil
}
// FinishRewrite finish rewrite procedure
func (persister *Persister) FinishRewrite(ctx *RewriteCtx) {
persister.pausingAof.Lock() // pausing aof
defer persister.pausingAof.Unlock()
tmpFile := ctx.tmpFile
// copy commands executed during rewriting to tmpFile
errOccurs := func() bool {
/* read write commands executed during rewriting */
src, err := os.Open(persister.aofFilename)
if err != nil {
logger.Error("open aofFilename failed: " + err.Error())
return true
}
defer func() {
_ = src.Close()
_ = tmpFile.Close()
}()
_, err = src.Seek(ctx.fileSize, 0)
if err != nil {
logger.Error("seek failed: " + err.Error())
return true
}
// sync tmpFile's db index with online aofFile
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
_, err = tmpFile.Write(data)
if err != nil {
logger.Error("tmp file rewrite failed: " + err.Error())
return true
}
// copy data
_, err = io.Copy(tmpFile, src)
if err != nil {
logger.Error("copy aof filed failed: " + err.Error())
return true
}
return false
}()
if errOccurs {
return
}
// replace current aof file by tmp file
_ = persister.aofFile.Close()
if err := os.Rename(tmpFile.Name(), persister.aofFilename); err != nil {
logger.Warn(err)
}
// reopen aof file for further write
aofFile, err := os.OpenFile(persister.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
persister.aofFile = aofFile
// write select command again to resume aof file selected db
// it should have the same db index with persister.currentDB
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(persister.currentDB))).ToBytes()
_, err = persister.aofFile.Write(data)
if err != nil {
panic(err)
}
}