Skip to content

Commit

Permalink
rename godis/redis/reply to godis/redis/protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed Apr 10, 2022
1 parent c97f3aa commit 37ef7d8
Show file tree
Hide file tree
Showing 55 changed files with 774 additions and 774 deletions.
12 changes: 6 additions & 6 deletions aof/aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/parser"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/protocol"
"io"
"os"
"strconv"
Expand Down Expand Up @@ -78,15 +78,15 @@ func (handler *Handler) handleAof() {
handler.pausingAof.RLock() // prevent other goroutines from pausing aof
if p.dbIndex != handler.currentDB {
// select db
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
continue // skip this command
}
handler.currentDB = p.dbIndex
}
data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
data := protocol.MakeMultiBulkReply(p.cmdLine).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
Expand Down Expand Up @@ -135,13 +135,13 @@ func (handler *Handler) LoadAof(maxBytes int) {
logger.Error("empty payload")
continue
}
r, ok := p.Data.(*reply.MultiBulkReply)
r, ok := p.Data.(*protocol.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
logger.Error("require multi bulk protocol")
continue
}
ret := handler.db.Exec(fakeConn, r.Args)
if reply.IsErrorReply(ret) {
if protocol.IsErrorReply(ret) {
logger.Error("exec err", err)
}
}
Expand Down
30 changes: 15 additions & 15 deletions aof/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import (
"github.com/hdt3213/godis/datastruct/set"
SortedSet "github.com/hdt3213/godis/datastruct/sortedset"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
"time"
)

// EntityToCmd serialize data entity to redis command
func EntityToCmd(key string, entity *database.DataEntity) *reply.MultiBulkReply {
func EntityToCmd(key string, entity *database.DataEntity) *protocol.MultiBulkReply {
if entity == nil {
return nil
}
var cmd *reply.MultiBulkReply
var cmd *protocol.MultiBulkReply
switch val := entity.Data.(type) {
case []byte:
cmd = stringToCmd(key, val)
Expand All @@ -34,17 +34,17 @@ func EntityToCmd(key string, entity *database.DataEntity) *reply.MultiBulkReply

var setCmd = []byte("SET")

func stringToCmd(key string, bytes []byte) *reply.MultiBulkReply {
func stringToCmd(key string, bytes []byte) *protocol.MultiBulkReply {
args := make([][]byte, 3)
args[0] = setCmd
args[1] = []byte(key)
args[2] = bytes
return reply.MakeMultiBulkReply(args)
return protocol.MakeMultiBulkReply(args)
}

var rPushAllCmd = []byte("RPUSH")

func listToCmd(key string, list *List.LinkedList) *reply.MultiBulkReply {
func listToCmd(key string, list *List.LinkedList) *protocol.MultiBulkReply {
args := make([][]byte, 2+list.Len())
args[0] = rPushAllCmd
args[1] = []byte(key)
Expand All @@ -53,12 +53,12 @@ func listToCmd(key string, list *List.LinkedList) *reply.MultiBulkReply {
args[2+i] = bytes
return true
})
return reply.MakeMultiBulkReply(args)
return protocol.MakeMultiBulkReply(args)
}

var sAddCmd = []byte("SADD")

func setToCmd(key string, set *set.Set) *reply.MultiBulkReply {
func setToCmd(key string, set *set.Set) *protocol.MultiBulkReply {
args := make([][]byte, 2+set.Len())
args[0] = sAddCmd
args[1] = []byte(key)
Expand All @@ -68,12 +68,12 @@ func setToCmd(key string, set *set.Set) *reply.MultiBulkReply {
i++
return true
})
return reply.MakeMultiBulkReply(args)
return protocol.MakeMultiBulkReply(args)
}

var hMSetCmd = []byte("HMSET")

func hashToCmd(key string, hash dict.Dict) *reply.MultiBulkReply {
func hashToCmd(key string, hash dict.Dict) *protocol.MultiBulkReply {
args := make([][]byte, 2+hash.Len()*2)
args[0] = hMSetCmd
args[1] = []byte(key)
Expand All @@ -85,12 +85,12 @@ func hashToCmd(key string, hash dict.Dict) *reply.MultiBulkReply {
i++
return true
})
return reply.MakeMultiBulkReply(args)
return protocol.MakeMultiBulkReply(args)
}

var zAddCmd = []byte("ZADD")

func zSetToCmd(key string, zset *SortedSet.SortedSet) *reply.MultiBulkReply {
func zSetToCmd(key string, zset *SortedSet.SortedSet) *protocol.MultiBulkReply {
args := make([][]byte, 2+zset.Len()*2)
args[0] = zAddCmd
args[1] = []byte(key)
Expand All @@ -102,16 +102,16 @@ func zSetToCmd(key string, zset *SortedSet.SortedSet) *reply.MultiBulkReply {
i++
return true
})
return reply.MakeMultiBulkReply(args)
return protocol.MakeMultiBulkReply(args)
}

var pExpireAtBytes = []byte("PEXPIREAT")

// MakeExpireCmd generates command line to set expiration for the given key
func MakeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
func MakeExpireCmd(key string, expireAt time.Time) *protocol.MultiBulkReply {
args := make([][]byte, 3)
args[0] = pExpireAtBytes
args[1] = []byte(key)
args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
return reply.MakeMultiBulkReply(args)
return protocol.MakeMultiBulkReply(args)
}
8 changes: 4 additions & 4 deletions aof/rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/protocol"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -55,7 +55,7 @@ func (handler *Handler) DoRewrite(ctx *RewriteCtx) error {
// rewrite aof tmpFile
for i := 0; i < config.Properties.Databases; i++ {
// select db
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
_, err := tmpFile.Write(data)
if err != nil {
return err
Expand Down Expand Up @@ -128,7 +128,7 @@ func (handler *Handler) FinishRewrite(ctx *RewriteCtx) {
}

// sync tmpFile's db index with online aofFile
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
_, err = tmpFile.Write(data)
if err != nil {
logger.Error("tmp file rewrite failed: " + err.Error())
Expand All @@ -153,7 +153,7 @@ func (handler *Handler) FinishRewrite(ctx *RewriteCtx) {
handler.aofFile = aofFile

// reset selected db 重新写入一次 select 指令保证 aof 中的数据库与 handler.currentDB 一致
data = reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes()
data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes()
_, err = handler.aofFile.Write(data)
if err != nil {
panic(err)
Expand Down
22 changes: 11 additions & 11 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/hdt3213/godis/lib/consistenthash"
"github.com/hdt3213/godis/lib/idgenerator"
"github.com/hdt3213/godis/lib/logger"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/protocol"
"github.com/jolestar/go-commons-pool/v2"
"runtime/debug"
"strconv"
Expand Down Expand Up @@ -97,35 +97,35 @@ func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis
defer func() {
if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
result = &reply.UnknownErrReply{}
result = &protocol.UnknownErrReply{}
}
}()
cmdName := strings.ToLower(string(cmdLine[0]))
if cmdName == "auth" {
return database2.Auth(c, cmdLine[1:])
}
if !isAuthenticated(c) {
return reply.MakeErrReply("NOAUTH Authentication required")
return protocol.MakeErrReply("NOAUTH Authentication required")
}

if cmdName == "multi" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
return protocol.MakeArgNumErrReply(cmdName)
}
return database2.StartMulti(c)
} else if cmdName == "discard" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
return protocol.MakeArgNumErrReply(cmdName)
}
return database2.DiscardMulti(c)
} else if cmdName == "exec" {
if len(cmdLine) != 1 {
return reply.MakeArgNumErrReply(cmdName)
return protocol.MakeArgNumErrReply(cmdName)
}
return execMulti(cluster, c, nil)
} else if cmdName == "select" {
if len(cmdLine) != 2 {
return reply.MakeArgNumErrReply(cmdName)
return protocol.MakeArgNumErrReply(cmdName)
}
return execSelect(c, cmdLine)
}
Expand All @@ -134,7 +134,7 @@ func (cluster *Cluster) Exec(c redis.Connection, cmdLine [][]byte) (result redis
}
cmdFunc, ok := router[cmdName]
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode")
return protocol.MakeErrReply("ERR unknown command '" + cmdName + "', or not supported in cluster mode")
}
result = cmdFunc(cluster, c, cmdLine)
return
Expand Down Expand Up @@ -178,11 +178,11 @@ func (cluster *Cluster) groupBy(keys []string) map[string][]string {
func execSelect(c redis.Connection, args [][]byte) redis.Reply {
dbIndex, err := strconv.Atoi(string(args[1]))
if err != nil {
return reply.MakeErrReply("ERR invalid DB index")
return protocol.MakeErrReply("ERR invalid DB index")
}
if dbIndex >= config.Properties.Databases {
return reply.MakeErrReply("ERR DB index is out of range")
return protocol.MakeErrReply("ERR DB index is out of range")
}
c.SelectDB(dbIndex)
return reply.MakeOkReply()
return protocol.MakeOkReply()
}
4 changes: 2 additions & 2 deletions cluster/com.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/client"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)

Expand Down Expand Up @@ -44,7 +44,7 @@ func (cluster *Cluster) relay(peer string, c redis.Connection, args [][]byte) re
}
peerClient, err := cluster.getPeerClient(peer)
if err != nil {
return reply.MakeErrReply(err.Error())
return protocol.MakeErrReply(err.Error())
}
defer func() {
_ = cluster.returnPeerClient(peer, peerClient)
Expand Down
2 changes: 1 addition & 1 deletion cluster/com_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/lib/utils"
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/reply/asserts"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)

Expand Down
10 changes: 5 additions & 5 deletions cluster/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package cluster

import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/protocol"
"strconv"
)

// Del atomically removes given writeKeys from cluster, writeKeys can be distributed on any node
// if the given writeKeys are distributed on different node, Del will use try-commit-catch to remove them
func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'del' command")
return protocol.MakeErrReply("ERR wrong number of arguments for 'del' command")
}
keys := make([]string, len(args)-1)
for i := 1; i < len(args); i++ {
Expand All @@ -36,7 +36,7 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
} else {
resp = cluster.relay(peer, c, makeArgs("Prepare", peerArgs...))
}
if reply.IsErrorReply(resp) {
if protocol.IsErrorReply(resp) {
errReply = resp
rollback = true
break
Expand All @@ -56,10 +56,10 @@ func Del(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if !rollback {
var deleted int64 = 0
for _, resp := range respList {
intResp := resp.(*reply.IntReply)
intResp := resp.(*protocol.IntReply)
deleted += intResp.Code
}
return reply.MakeIntReply(int64(deleted))
return protocol.MakeIntReply(int64(deleted))
}
return errReply
}
2 changes: 1 addition & 1 deletion cluster/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cluster

import (
"github.com/hdt3213/godis/redis/connection"
"github.com/hdt3213/godis/redis/reply/asserts"
"github.com/hdt3213/godis/redis/protocol/asserts"
"testing"
)

Expand Down
12 changes: 6 additions & 6 deletions cluster/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@ package cluster

import (
"github.com/hdt3213/godis/interface/redis"
"github.com/hdt3213/godis/redis/reply"
"github.com/hdt3213/godis/redis/protocol"
)

// FlushDB removes all data in current database
func FlushDB(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
replies := cluster.broadcast(c, args)
var errReply reply.ErrorReply
var errReply protocol.ErrorReply
for _, v := range replies {
if reply.IsErrorReply(v) {
errReply = v.(reply.ErrorReply)
if protocol.IsErrorReply(v) {
errReply = v.(protocol.ErrorReply)
break
}
}
if errReply == nil {
return &reply.OkReply{}
return &protocol.OkReply{}
}
return reply.MakeErrReply("error occurs: " + errReply.Error())
return protocol.MakeErrReply("error occurs: " + errReply.Error())
}

// FlushAll removes all data in cluster
Expand Down
Loading

0 comments on commit 37ef7d8

Please sign in to comment.