Skip to content

Commit

Permalink
support for large requests/responses
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk committed Jun 25, 2015
1 parent 6d92fdc commit ffbe565
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
46 changes: 30 additions & 16 deletions rpc/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,30 @@ package codec

import (
"encoding/json"
"fmt"
"net"
"time"

"github.com/ethereum/go-ethereum/rpc/shared"
)

const (
MAX_REQUEST_SIZE = 1024 * 1024
MAX_REQUEST_SIZE = 1024 * 1024
MAX_RESPONSE_SIZE = 1024 * 1024
)

// Json serialization support
type JsonCodec struct {
c net.Conn
buffer []byte
c net.Conn
buffer []byte
bytesInBuffer int
}

// Create new JSON coder instance
func NewJsonCoder(conn net.Conn) ApiCoder {
return &JsonCodec{
c: conn,
buffer: make([]byte, MAX_REQUEST_SIZE),
c: conn,
buffer: make([]byte, MAX_REQUEST_SIZE),
bytesInBuffer: 0,
}
}
Expand Down Expand Up @@ -58,28 +60,40 @@ func (self *JsonCodec) ReadRequest() (requests []*shared.Request, isBatch bool,
}

func (self *JsonCodec) ReadResponse() (interface{}, error) {
var err error
bytesInBuffer := 0
buf := make([]byte, MAX_RESPONSE_SIZE)
n, _ := self.c.Read(buf)

var failure shared.ErrorResponse
if err = json.Unmarshal(buf[:n], &failure); err == nil && failure.Error != nil {
return failure, nil
}
deadline := time.Now().Add(15 * time.Second)
self.c.SetDeadline(deadline)

for {
n, err := self.c.Read(buf[bytesInBuffer:])
if err != nil {
return nil, err
}
bytesInBuffer += n

var success shared.SuccessResponse
if err = json.Unmarshal(buf[:n], &success); err == nil {
return success, nil
var success shared.SuccessResponse
if err = json.Unmarshal(buf[:bytesInBuffer], &success); err == nil {
return success, nil
}

var failure shared.ErrorResponse
if err = json.Unmarshal(buf[:bytesInBuffer], &failure); err == nil && failure.Error != nil {
return failure, nil
}
}

return nil, err
self.c.Close()
return nil, fmt.Errorf("Unable to read response")
}

// Encode response to encoded form in underlying stream
// Decode data
func (self *JsonCodec) Decode(data []byte, msg interface{}) error {
return json.Unmarshal(data, msg)
}

// Encode message
func (self *JsonCodec) Encode(msg interface{}) ([]byte, error) {
return json.Marshal(msg)
}
Expand Down
1 change: 1 addition & 0 deletions rpc/comms/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type IpcConfig struct {

type ipcClient struct {
endpoint string
c net.Conn
codec codec.Codec
coder codec.ApiCoder
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/comms/ipc_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newIpcClient(cfg IpcConfig, codec codec.Codec) (*ipcClient, error) {
return nil, err
}

return &ipcClient{cfg.Endpoint, codec, codec.New(c)}, nil
return &ipcClient{cfg.Endpoint, c, codec, codec.New(c)}, nil
}

func (self *ipcClient) reconnect() error {
Expand Down

0 comments on commit ffbe565

Please sign in to comment.