Skip to content

Commit

Permalink
refactor resp writing even further by making conn have a writeBuffer …
Browse files Browse the repository at this point in the history
…pre-allocated which resp simply appends to
  • Loading branch information
Brian Picciano committed May 9, 2015
1 parent 553467c commit 22b3950
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 120 deletions.
6 changes: 5 additions & 1 deletion redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Client struct {
reader *bufio.Reader
pending []*request
completed []*Reply
writeBuf []byte
}

// request describes a client's request to the redis server
Expand All @@ -51,6 +52,7 @@ func DialTimeout(network, addr string, timeout time.Duration) (*Client, error) {
c.Conn = conn
c.timeout = timeout
c.reader = bufio.NewReaderSize(conn, bufSize)
c.writeBuf = make([]byte, 0, 1024)
return c, nil
}

Expand Down Expand Up @@ -152,7 +154,9 @@ func (c *Client) writeRequest(requests ...*request) error {
req := make([]interface{}, 0, len(requests[i].args)+1)
req = append(req, requests[i].cmd)
req = append(req, requests[i].args...)
err := resp.WriteArbitraryAsFlattenedStrings(c.Conn, req)
buf := resp.AppendArbitraryAsFlattenedStrings(c.writeBuf[:0], req)

_, err := c.Conn.Write(buf)
if err != nil {
c.Close()
return err
Expand Down
244 changes: 125 additions & 119 deletions redis/resp/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ const (
Nil
)

const (
simpleStrPrefix = '+'
errPrefix = '-'
intPrefix = ':'
bulkStrPrefix = '$'
arrayPrefix = '*'
var (
simpleStrPrefix = []byte{'+'}
errPrefix = []byte{'-'}
intPrefix = []byte{':'}
bulkStrPrefix = []byte{'$'}
arrayPrefix = []byte{'*'}
)

// Parse errors
Expand Down Expand Up @@ -88,15 +88,15 @@ func bufioReadMessage(r *bufio.Reader) (*Message, error) {
return nil, err
}
switch b[0] {
case simpleStrPrefix:
case simpleStrPrefix[0]:
return readSimpleStr(r)
case errPrefix:
case errPrefix[0]:
return readError(r)
case intPrefix:
case intPrefix[0]:
return readInt(r)
case bulkStrPrefix:
case bulkStrPrefix[0]:
return readBulkStr(r)
case arrayPrefix:
case arrayPrefix[0]:
return readArray(r)
default:
return nil, badType
Expand Down Expand Up @@ -263,16 +263,52 @@ func WriteMessage(w io.Writer, m *Message) error {
return err
}

// AppendArbitrary takes in any primitive golang value, or Message, and appends
// its encoded form to the given buffer, inferring types where appropriate. It
// then returns the appended buffer
func AppendArbitrary(buf []byte, m interface{}) []byte {
return appendArb(buf, m, false, false)
}

// WriteArbitrary takes in any primitive golang value, or Message, and writes
// its encoded form to the given io.Writer, inferring types where appropriate.
func WriteArbitrary(w io.Writer, m interface{}) error {
return write(w, m, false, false)
buf := AppendArbitrary(make([]byte, 0, 1024), m)
_, err := w.Write(buf)
return err
}

// AppendArbitraryAsString is similar to AppendArbitraryAsFlattenedString except
// that it won't flatten any embedded arrays.
func AppendArbitraryAsStrings(buf []byte, m interface{}) []byte {
return appendArb(buf, m, true, false)
}

// WriteArbitraryAsString is similar to WriteArbitraryAsFlattenedString except
// that it won't flatten any embedded arrays.
func WriteArbitraryAsString(w io.Writer, m interface{}) error {
return write(w, m, true, false)
buf := AppendArbitraryAsStrings(make([]byte, 0, 1024), m)
_, err := w.Write(buf)
return err
}

// AppendArbitraryAsFlattenedStrings is similar to AppendArbitrary except that
// it will encode all types except Array as a BulkStr, converting the argument
// into a string first as necessary. It will also flatten any embedded arrays
// into a single long array. This is useful because commands to a redis server
// must be given as an array of bulk strings. If the argument isn't already in a
// slice or map it will be wrapped so that it is written as an Array of size
// one.
//
// Note that if a Message type is found it will *not* be encoded to a BulkStr,
// but will simply be passed through as whatever type it already represents.
func AppendArbitraryAsFlattenedStrings(buf []byte, m interface{}) []byte {
fl := flattenedLength(m)
buf = append(buf, arrayPrefix...)
buf = strconv.AppendInt(buf, int64(fl), 10)
buf = append(buf, delim...)

return appendArb(buf, m, true, true)
}

// WriteArbitraryAsFlattenedStrings is similar to WriteArbitrary except that it
Expand All @@ -285,159 +321,126 @@ func WriteArbitraryAsString(w io.Writer, m interface{}) error {
// Note that if a Message type is found it will *not* be encoded to a BulkStr,
// but will simply be passed through as whatever type it already represents.
func WriteArbitraryAsFlattenedStrings(w io.Writer, m interface{}) error {
fl := flattenedLength(m)
var err error
err = writeBytesHelper(w, []byte("*"), err)
err = writeBytesHelper(w, []byte(strconv.Itoa(fl)), err)
err = writeBytesHelper(w, []byte("\r\n"), err)
if err != nil {
return err
}

return write(w, m, true, true)
buf := AppendArbitraryAsFlattenedStrings(make([]byte, 0, 1024), m)
_, err := w.Write(buf)
return err
}

func write(w io.Writer, m interface{}, forceString, flattened bool) error {
func appendArb(buf []byte, m interface{}, forceString, flattened bool) []byte {
switch mt := m.(type) {
case []byte:
return writeStr(w, mt)
return appendStr(buf, mt)
case string:
return writeStr(w, []byte(mt))
return appendStr(buf, []byte(mt))
case bool:
if mt {
return writeStr(w, []byte("1"))
return appendStr(buf, []byte("1"))
} else {
return writeStr(w, []byte("0"))
return appendStr(buf, []byte("0"))
}
case nil:
if forceString {
return writeStr(w, []byte{})
return appendStr(buf, []byte{})
} else {
return writeNil(w)
return appendNil(buf)
}
case int:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case int8:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case int16:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case int32:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case int64:
return writeInt(w, mt, forceString)
return appendInt(buf, mt, forceString)
case uint:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case uint8:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case uint16:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case uint32:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case uint64:
return writeInt(w, int64(mt), forceString)
return appendInt(buf, int64(mt), forceString)
case float32:
ft := strconv.FormatFloat(float64(mt), 'f', -1, 32)
return writeStr(w, []byte(ft))
return appendStr(buf, []byte(ft))
case float64:
ft := strconv.FormatFloat(mt, 'f', -1, 64)
return writeStr(w, []byte(ft))
return appendStr(buf, []byte(ft))
case error:
if forceString {
return writeStr(w, []byte(mt.Error()))
return appendStr(buf, []byte(mt.Error()))
} else {
return writeErr(w, mt)
return appendErr(buf, mt)
}

// For the following cases, where we are writing an array, we only write the
// array header (a new array) if flattened is false, otherwise we just write
// For the following cases, where we are writing an array, we only append the
// array header (a new array) if flattened is false, otherwise we just append
// each element inline and assume the array header has already been written

// We duplicate the below code here a bit, since this is the common case and
// it'd be better to not get the reflect package involved here
case []interface{}:
l := len(mt)
lstr := strconv.Itoa(l)

var err error

if !flattened {
err = writeBytesHelper(w, []byte("*"), err)
err = writeBytesHelper(w, []byte(lstr), err)
err = writeBytesHelper(w, []byte("\r\n"), err)
if err != nil {
return err
}
buf = append(buf, arrayPrefix...)
buf = strconv.AppendInt(buf, int64(l), 10)
buf = append(buf, delim...)
}

for i := 0; i < l; i++ {
if err = write(w, mt[i], forceString, flattened); err != nil {
return err
}
buf = appendArb(buf, mt[i], forceString, flattened)
}
return nil
return buf

case *Message:
_, err := w.Write(mt.raw)
return err
buf = append(buf, mt.raw...)
return buf

default:
// Fallback to reflect-based.
switch reflect.TypeOf(m).Kind() {
case reflect.Slice:
rm := reflect.ValueOf(mt)
l := rm.Len()
lstr := strconv.Itoa(l)

var err error

if !flattened {
err = writeBytesHelper(w, []byte("*"), err)
err = writeBytesHelper(w, []byte(lstr), err)
err = writeBytesHelper(w, []byte("\r\n"), err)
if err != nil {
return err
}
buf = append(buf, arrayPrefix...)
buf = strconv.AppendInt(buf, int64(l), 10)
buf = append(buf, delim...)
}

for i := 0; i < l; i++ {
vv := rm.Index(i).Interface()
if err = write(w, vv, forceString, flattened); err != nil {
return err
}
buf = appendArb(buf, vv, forceString, flattened)
}
return nil
return buf

case reflect.Map:
rm := reflect.ValueOf(mt)
l := rm.Len() * 2
lstr := strconv.Itoa(l)

var err error

if !flattened {
err = writeBytesHelper(w, []byte("*"), err)
err = writeBytesHelper(w, []byte(lstr), err)
err = writeBytesHelper(w, []byte("\r\n"), err)
if err != nil {
return err
}
buf = append(buf, arrayPrefix...)
buf = strconv.AppendInt(buf, int64(l), 10)
buf = append(buf, delim...)
}

keys := rm.MapKeys()
for _, k := range keys {
kv := k.Interface()
vv := rm.MapIndex(k).Interface()
if err = write(w, kv, forceString, flattened); err != nil {
return err
}
if err = write(w, vv, forceString, flattened); err != nil {
return err
}
buf = appendArb(buf, kv, forceString, flattened)
buf = appendArb(buf, vv, forceString, flattened)
}
return nil
return buf

default:
return writeStr(w, []byte(fmt.Sprint(m)))
return appendStr(buf, []byte(fmt.Sprint(m)))
}
}
}
Expand Down Expand Up @@ -479,41 +482,44 @@ func flattenedLength(m interface{}) int {
return total
}

func writeStr(w io.Writer, b []byte) error {
l := strconv.Itoa(len(b))
var err error
err = writeBytesHelper(w, []byte{bulkStrPrefix}, err)
err = writeBytesHelper(w, []byte(l), err)
err = writeBytesHelper(w, delim, err)
err = writeBytesHelper(w, b, err)
err = writeBytesHelper(w, delim, err)
return err
func appendStr(buf []byte, b []byte) []byte {
buf = append(buf, bulkStrPrefix...)
buf = strconv.AppendInt(buf, int64(len(b)), 10)
buf = append(buf, delim...)
buf = append(buf, b...)
buf = append(buf, delim...)
return buf
}

func writeErr(w io.Writer, ierr error) error {
ierrstr := []byte(ierr.Error())
var err error
err = writeBytesHelper(w, []byte{errPrefix}, err)
err = writeBytesHelper(w, ierrstr, err)
err = writeBytesHelper(w, delim, err)
return err
func appendErr(buf []byte, ierr error) []byte {
buf = append(buf, errPrefix...)
buf = append(buf, []byte(ierr.Error())...)
buf = append(buf, delim...)
return buf
}

func writeInt(w io.Writer, i int64, forceString bool) error {
istr := strconv.FormatInt(i, 10)
if forceString {
return writeStr(w, []byte(istr))
func appendInt(buf []byte, i int64, forceString bool) []byte {
if !forceString {
buf = append(buf, intPrefix...)
} else {
// Really want to avoid alloating a new []byte. So I write the int to
// the buf for the sole purpose of getting its length as a string, and
// even though it'll be immediately overwritten right after and
// AppendInt will be called again. This isn't great.
tmpBuf := strconv.AppendInt(buf, i, 10)

buf = append(buf, bulkStrPrefix...)
buf = strconv.AppendInt(buf, int64(len(tmpBuf)-len(buf)+1), 10)
buf = append(buf, delim...)
}
var err error
err = writeBytesHelper(w, []byte{intPrefix}, err)
err = writeBytesHelper(w, []byte(istr), err)
err = writeBytesHelper(w, delim, err)
return err

buf = strconv.AppendInt(buf, i, 10)
buf = append(buf, delim...)
return buf
}

var nilFormatted = []byte("$-1\r\n")

func writeNil(w io.Writer) error {
_, err := w.Write(nilFormatted)
return err
func appendNil(buf []byte) []byte {
return append(buf, nilFormatted...)
}

0 comments on commit 22b3950

Please sign in to comment.