forked from HDT3213/godis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrename.go
140 lines (134 loc) · 4.92 KB
/
rename.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package cluster
import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)
// Rename renames a key, the origin and the destination must within the same node
func Rename(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 3 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'rename' command")
}
srcKey := string(args[1])
destKey := string(args[2])
srcNode := cluster.peerPicker.PickNode(srcKey)
destNode := cluster.peerPicker.PickNode(destKey)
if srcNode == destNode { // do fast
return cluster.relay(srcNode, c, args)
}
groupMap := map[string][]string{
srcNode: {srcKey},
destNode: {destKey},
}
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
// prepare rename from
srcPrepareResp := cluster.relayPrepare(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey))
if protocol.IsErrorReply(srcPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return srcPrepareResp
}
srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply)
if !ok || len(srcPrepareMBR.Args) < 2 {
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return protocol.MakeErrReply("ERR invalid prepare response")
}
// prepare rename to
destPrepareResp := cluster.relayPrepare(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr),
[]byte("RenameTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]))
if protocol.IsErrorReply(destPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, groupMap)
return destPrepareResp
}
if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil {
requestRollback(cluster, c, txID, groupMap)
return errReply
}
return protocol.MakeOkReply()
}
// prepareRenameFrom is prepare-function for RenameFrom, see prepareFuncMap
func prepareRenameFrom(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 2 {
return protocol.MakeArgNumErrReply("RenameFrom")
}
key := string(cmdLine[1])
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
if protocol.IsErrorReply(existResp) {
return existResp
}
existIntResp := existResp.(*protocol.IntReply)
if existIntResp.Code == 0 {
return protocol.MakeErrReply("ERR no such key")
}
return cluster.db.ExecWithLock(conn, utils.ToCmdLine2("DumpKey", key))
}
func prepareRenameNxTo(cluster *Cluster, conn redis.Connection, cmdLine CmdLine) redis.Reply {
if len(cmdLine) != 4 {
return protocol.MakeArgNumErrReply("RenameNxTo")
}
key := string(cmdLine[1])
existResp := cluster.db.ExecWithLock(conn, utils.ToCmdLine("Exists", key))
if protocol.IsErrorReply(existResp) {
return existResp
}
existIntResp := existResp.(*protocol.IntReply)
if existIntResp.Code == 1 {
return protocol.MakeErrReply(keyExistsErr)
}
return protocol.MakeOkReply()
}
func init() {
registerPrepareFunc("RenameFrom", prepareRenameFrom)
registerPrepareFunc("RenameNxTo", prepareRenameNxTo)
}
// RenameNx renames a key, only if the new key does not exist.
// The origin and the destination must within the same node
func RenameNx(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 3 {
return protocol.MakeErrReply("ERR wrong number of arguments for 'renamenx' command")
}
srcKey := string(args[1])
destKey := string(args[2])
srcNode := cluster.peerPicker.PickNode(srcKey)
destNode := cluster.peerPicker.PickNode(destKey)
if srcNode == destNode {
return cluster.relay(srcNode, c, args)
}
groupMap := map[string][]string{
srcNode: {srcKey},
destNode: {destKey},
}
txID := cluster.idGenerator.NextID()
txIDStr := strconv.FormatInt(txID, 10)
// prepare rename from
srcPrepareResp := cluster.relayPrepare(srcNode, c, makeArgs("Prepare", txIDStr, "RenameFrom", srcKey))
if protocol.IsErrorReply(srcPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return srcPrepareResp
}
srcPrepareMBR, ok := srcPrepareResp.(*protocol.MultiBulkReply)
if !ok || len(srcPrepareMBR.Args) < 2 {
requestRollback(cluster, c, txID, map[string][]string{srcNode: {srcKey}})
return protocol.MakeErrReply("ERR invalid prepare response")
}
// prepare rename to
destPrepareResp := cluster.relayPrepare(destNode, c, utils.ToCmdLine3("Prepare", []byte(txIDStr),
[]byte("RenameNxTo"), []byte(destKey), srcPrepareMBR.Args[0], srcPrepareMBR.Args[1]))
if protocol.IsErrorReply(destPrepareResp) {
// rollback src node
requestRollback(cluster, c, txID, groupMap)
if re := destPrepareResp.(protocol.ErrorReply); re.Error() == keyExistsErr {
return protocol.MakeIntReply(0)
}
return destPrepareResp
}
if _, errReply := requestCommit(cluster, c, txID, groupMap); errReply != nil {
requestRollback(cluster, c, txID, groupMap)
return errReply
}
return protocol.MakeIntReply(1)
}