Skip to content

Commit

Permalink
fix incorrect frame pointer usage
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Jan 12, 2022
1 parent a635e1f commit 6775ef4
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 40 deletions.
16 changes: 12 additions & 4 deletions internal/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"bytes"
stderr "errors"
"io"

"github.com/spiral/errors"
Expand All @@ -16,7 +17,7 @@ func ReceiveFrame(relay io.Reader, fr *frame.Frame) error {

_, err := io.ReadFull(relay, fr.Header())
if err != nil {
return errors.E(op, err)
return err
}

if bytes.Equal(fr.Header(), res) {
Expand All @@ -37,6 +38,9 @@ func ReceiveFrame(relay io.Reader, fr *frame.Frame) error {
// read next part of the frame - options
_, err = io.ReadFull(relay, opts)
if err != nil {
if stderr.Is(err, io.EOF) {
return err
}
return errors.E(op, err)
}

Expand All @@ -57,10 +61,14 @@ func ReceiveFrame(relay io.Reader, fr *frame.Frame) error {
}

pb := get(pl)
_, err = io.ReadFull(relay, (*pb)[:pl])
if err != nil {
_, err2 := io.ReadFull(relay, (*pb)[:pl])
if err2 != nil {
if stderr.Is(err2, io.EOF) {
put(pl, pb)
return err
}
put(pl, pb)
return errors.E(op, err)
return errors.E(op, err2)
}

fr.WritePayload((*pb)[:pl])
Expand Down
99 changes: 69 additions & 30 deletions pkg/rpc/client_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,6 @@ func TestClientServerProto(t *testing.T) {
assert.NoError(t, err)

client := rpc.NewClientWithCodec(NewClientCodec(conn))
defer func() {
err := client.Close()
if err != nil {
t.Fatal(err)
}
}()

keysP := &test.Payload{
Storage: "memory-rr",
Items: []*test.Item{
Expand All @@ -107,6 +100,13 @@ func TestClientServerProto(t *testing.T) {
item := &test.Item{}
assert.NoError(t, client.Call("test123.ProtoMessage", keysP, item))
assert.Equal(t, "a", item.Key)

t.Cleanup(func() {
err2 := client.Close()
if err2 != nil {
t.Fatal(err2)
}
})
}

func TestClientServerProtoError(t *testing.T) {
Expand All @@ -128,13 +128,6 @@ func TestClientServerProtoError(t *testing.T) {
assert.NoError(t, err)

client := rpc.NewClientWithCodec(NewClientCodec(conn))
defer func() {
err := client.Close()
if err != nil {
t.Fatal(err)
}
}()

keysP := &test.Payload{
Storage: "memory-rr",
Items: []*test.Item{
Expand All @@ -157,6 +150,13 @@ func TestClientServerProtoError(t *testing.T) {

item := &test.Item{}
assert.Error(t, client.Call("testError.ProtoMessage", keys, item))

t.Cleanup(func() {
err2 := client.Close()
if err2 != nil {
t.Fatal(err2)
}
})
}

func TestClientServerJSON(t *testing.T) {
Expand All @@ -178,12 +178,6 @@ func TestClientServerJSON(t *testing.T) {
assert.NoError(t, err)

client := rpc.NewClientWithCodec(NewClientCodec(conn))
defer func() {
err := client.Close()
if err != nil {
t.Fatal(err)
}
}()

var rp = Payload{}
assert.NoError(t, client.Call("test2.Process", Payload{
Expand All @@ -195,6 +189,47 @@ func TestClientServerJSON(t *testing.T) {
assert.Equal(t, "NAME", rp.Name)
assert.Equal(t, -1000, rp.Value)
assert.Equal(t, "key", rp.Keys["value"])

t.Cleanup(func() {
err2 := client.Close()
if err2 != nil {
t.Fatal(err2)
}
})
}

func TestClientServerRaw(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:18937")
assert.NoError(t, err)

go func() {
for {
conn, err2 := ln.Accept()
assert.NoError(t, err2)
rpc.ServeCodec(NewCodec(conn))
}
}()

err = rpc.RegisterName("testBinary", new(testService))
assert.NoError(t, err)

conn, err := net.Dial("tcp", "127.0.0.1:18937")
assert.NoError(t, err)

client := rpc.NewClientWithCodec(NewClientCodec(conn))

data := make([]byte, 100000)
_, _ = rand.Read(data)

resp := make([]byte, 0, 10000)
assert.NoError(t, client.Call("testBinary.EchoBinary", data, &resp))

t.Cleanup(func() {
err2 := client.Close()
if err2 != nil {
t.Fatal(err2)
}
})
}

func TestClientServerError(t *testing.T) {
Expand All @@ -216,16 +251,17 @@ func TestClientServerError(t *testing.T) {
assert.NoError(t, err)

client := rpc.NewClientWithCodec(NewClientCodec(conn))
defer func() {
err := client.Close()
if err != nil {
t.Fatal(err)
}
}()

err = client.Call("unknown", nil, nil)
assert.Error(t, err)
assert.Equal(t, "rpc: service/method request ill-formed: unknown", err.Error())

t.Cleanup(func() {
err2 := client.Close()
if err2 != nil {
t.Fatal(err2)
}
})
}

func TestClientServerConcurrent(t *testing.T) {
Expand All @@ -249,10 +285,6 @@ func TestClientServerConcurrent(t *testing.T) {
assert.NoError(t, err)

client := rpc.NewClientWithCodec(NewClientCodec(conn))
defer func() {
err := client.Close()
assert.NoError(t, err)
}()

wg := &sync.WaitGroup{}
wg.Add(300)
Expand Down Expand Up @@ -363,4 +395,11 @@ func TestClientServerConcurrent(t *testing.T) {
}

wg2.Wait()

t.Cleanup(func() {
err2 := client.Close()
if err2 != nil {
t.Fatal(err2)
}
})
}
21 changes: 15 additions & 6 deletions pkg/rpc/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package rpc
import (
"bytes"
"encoding/gob"
stderr "errors"
"io"
"net/rpc"
"sync"

json "github.com/json-iterator/go"
"github.com/spiral/errors"

"github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/goridge/v3/pkg/socket"
Expand Down Expand Up @@ -127,23 +129,23 @@ func (c *Codec) WriteResponse(r *rpc.Response, body interface{}) error { //nolin
buf.WriteString(r.ServiceMethod)
buf.Write(data)

c.frame.WritePayloadLen(c.frame.Header(), uint32(buf.Len()))
c.frame.WritePayload(buf.Bytes())
fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
fr.WritePayload(buf.Bytes())
case *[]byte:
buf.Grow(len(*data) + len(r.ServiceMethod))
// writeServiceMethod to the buffer
buf.WriteString(r.ServiceMethod)
buf.Write(*data)

c.frame.WritePayloadLen(c.frame.Header(), uint32(buf.Len()))
c.frame.WritePayload(buf.Bytes())
fr.WritePayloadLen(fr.Header(), uint32(buf.Len()))
fr.WritePayload(buf.Bytes())
default:
return c.handleError(r, fr, "unknown Raw payload type")
}

// send buffer
c.frame.WriteCRC(c.frame.Header())
return c.relay.Send(c.frame)
fr.WriteCRC(fr.Header())
return c.relay.Send(fr)

case codec.(byte)&frame.CODEC_JSON != 0:
data, err := json.Marshal(body)
Expand Down Expand Up @@ -247,13 +249,20 @@ func (c *Codec) ReadRequestHeader(r *rpc.Request) error {

err := c.relay.Receive(f)
if err != nil {
if stderr.Is(err, io.EOF) {
c.putFrame(f)
return err
}

c.putFrame(f)
return err
}

// opts[0] sequence ID
// opts[1] service method name offset from payload in bytes
opts := f.ReadOptions(f.Header())
if len(opts) != 2 {
c.putFrame(f)
return errors.E(op, errors.Str("should be 2 options. SEQ_ID and METHOD_LEN"))
}

Expand Down

0 comments on commit 6775ef4

Please sign in to comment.