forked from HDT3213/godis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrewrite.go
161 lines (143 loc) · 3.86 KB
/
rewrite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package aof
import (
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/reply"
"io"
"io/ioutil"
"os"
"strconv"
"time"
)
func (handler *Handler) newRewriteHandler() *Handler {
h := &Handler{}
h.aofFilename = handler.aofFilename
h.db = handler.tmpDBMaker()
return h
}
// RewriteCtx holds context of an AOF rewriting procedure
type RewriteCtx struct {
tmpFile *os.File
fileSize int64
dbIdx int // selected db index when startRewrite
}
// Rewrite carries out AOF rewrite
func (handler *Handler) Rewrite() {
ctx, err := handler.StartRewrite()
if err != nil {
logger.Warn(err)
return
}
err = handler.DoRewrite(ctx)
if err != nil {
logger.Error(err)
return
}
handler.FinishRewrite(ctx)
}
// DoRewrite actually rewrite aof file
// makes DoRewrite public for testing only, please use Rewrite instead
func (handler *Handler) DoRewrite(ctx *RewriteCtx) error {
tmpFile := ctx.tmpFile
// load aof tmpFile
tmpAof := handler.newRewriteHandler()
tmpAof.LoadAof(int(ctx.fileSize))
// rewrite aof tmpFile
for i := 0; i < config.Properties.Databases; i++ {
// select db
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
_, err := tmpFile.Write(data)
if err != nil {
return err
}
// dump db
tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
cmd := EntityToCmd(key, entity)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
if expiration != nil {
cmd := MakeExpireCmd(key, *expiration)
if cmd != nil {
_, _ = tmpFile.Write(cmd.ToBytes())
}
}
return true
})
}
return nil
}
// StartRewrite prepares rewrite procedure
func (handler *Handler) StartRewrite() (*RewriteCtx, error) {
handler.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock()
err := handler.aofFile.Sync()
if err != nil {
logger.Warn("fsync failed")
return nil, err
}
// get current aof file size
fileInfo, _ := os.Stat(handler.aofFilename)
filesize := fileInfo.Size()
// create tmp file
file, err := ioutil.TempFile("", "*.aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return &RewriteCtx{
tmpFile: file,
fileSize: filesize,
dbIdx: handler.currentDB,
}, nil
}
// FinishRewrite finish rewrite procedure
func (handler *Handler) FinishRewrite(ctx *RewriteCtx) {
handler.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock()
tmpFile := ctx.tmpFile
// write commands executed during rewriting to tmp file
src, err := os.Open(handler.aofFilename)
if err != nil {
logger.Error("open aofFilename failed: " + err.Error())
return
}
defer func() {
_ = src.Close()
}()
_, err = src.Seek(ctx.fileSize, 0)
if err != nil {
logger.Error("seek failed: " + err.Error())
return
}
// sync tmpFile's db index with online aofFile
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
_, err = tmpFile.Write(data)
if err != nil {
logger.Error("tmp file rewrite failed: " + err.Error())
return
}
// copy data
_, err = io.Copy(tmpFile, src)
if err != nil {
logger.Error("copy aof filed failed: " + err.Error())
return
}
// replace current aof file by tmp file
_ = handler.aofFile.Close()
_ = os.Rename(tmpFile.Name(), handler.aofFilename)
// reopen aof file for further write
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
handler.aofFile = aofFile
// reset selected db 重新写入一次 select 指令保证 aof 中的数据库与 handler.currentDB 一致
data = reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes()
_, err = handler.aofFile.Write(data)
if err != nil {
panic(err)
}
}