Skip to content

Commit

Permalink
init repo
Browse files Browse the repository at this point in the history
  • Loading branch information
Terry-Mao committed Jul 14, 2015
1 parent b884f20 commit cd75861
Show file tree
Hide file tree
Showing 12 changed files with 3,265 additions and 0 deletions.
315 changes: 315 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,315 @@
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package protorpc

import (
"bufio"
"errors"
proto "github.com/gogo/protobuf/proto"
"io"
"log"
"net"
"net/http"
"sync"
)

// ServerError represents an error that has been returned from
// the remote side of the RPC connection.
type ServerError string

func (e ServerError) Error() string {
return string(e)
}

var ErrShutdown = errors.New("connection is shut down")

// Call represents an active RPC.
type Call struct {
ServiceMethod string // The name of the service and method to call.
Args proto.Message // The argument to the function (*struct).
Reply proto.Message // The reply from the function (*struct).
Error error // After completion, the error status.
Done chan *Call // Strobes when call is complete.
}

// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
codec ClientCodec

reqMutex sync.Mutex // protects following
request Request

mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}

// A ClientCodec implements writing of RPC requests and
// reading of RPC responses for the client side of an RPC session.
// The client calls WriteRequest to write a request to the connection
// and calls ReadResponseHeader and ReadResponseBody in pairs
// to read responses. The client calls Close when finished with the
// connection. ReadResponseBody may be called with a nil
// argument to force the body of the response to be read and then
// discarded.
type ClientCodec interface {
// WriteRequest must be safe for concurrent use by multiple goroutines.
WriteRequest(*Request, proto.Message) error
ReadResponseHeader(*Response) error
ReadResponseBody(proto.Message) error

Close() error
}

func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()

// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
call.Error = ErrShutdown
client.mutex.Unlock()
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()

// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}

func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()

switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}

func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}

// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
func NewClient(conn io.ReadWriteCloser) *Client {
return NewClientWithCodec(NewPbClientCodec(conn, bufio.NewReader(conn), bufio.NewWriter(conn)))
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}

// type gobClientCodec struct {
// rwc io.ReadWriteCloser
// dec *gob.Decoder
// enc *gob.Encoder
// encBuf *bufio.Writer
// }

// func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
// if err = c.enc.Encode(r); err != nil {
// return
// }
// if err = c.enc.Encode(body); err != nil {
// return
// }
// return c.encBuf.Flush()
// }

// func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
// return c.dec.Decode(r)
// }

// func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
// return c.dec.Decode(body)
// }

// func (c *gobClientCodec) Close() error {
// return c.rwc.Close()
// }

// DialHTTP connects to an HTTP RPC server at the specified network address
// listening on the default HTTP RPC path.
func DialHTTP(network, address string) (*Client, error) {
return DialHTTPPath(network, address, DefaultRPCPath)
}

// DialHTTPPath connects to an HTTP RPC server
// at the specified network address and path.
func DialHTTPPath(network, address, path string) (*Client, error) {
var err error
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")

// Require successful HTTP response
// before switching to RPC protocol.
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
conn.Close()
return nil, &net.OpError{
Op: "dial-http",
Net: network + " " + address,
Addr: nil,
Err: err,
}
}

// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}

func (client *Client) Close() error {
client.mutex.Lock()
if client.closing {
client.mutex.Unlock()
return ErrShutdown
}
client.closing = true
client.mutex.Unlock()
return client.codec.Close()
}

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args proto.Message, reply proto.Message, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args proto.Message, reply proto.Message) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
57 changes: 57 additions & 0 deletions client_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package protorpc

import (
"bufio"
"bytes"
"encoding/binary"
"github.com/gogo/protobuf/proto"
"io"
)

type pbClientCodec struct {
rwc io.ReadWriteCloser
reqBuf bytes.Buffer
argsBuf bytes.Buffer
wr *bufio.Writer
rr *bufio.Reader
packBuf [binary.MaxVarintLen32]byte
}

// NewPbClientCodec returns a new ClientCodec using Protobuf-RPC on conn.
func NewPbClientCodec(rwc io.ReadWriteCloser, rr *bufio.Reader, wr *bufio.Writer) ClientCodec {
p := new(pbClientCodec)
p.rwc = rwc
p.wr = wr
p.rr = rr
return p
}

func (c *pbClientCodec) WriteRequest(r *Request, p proto.Message) (err error) {
var (
rb, pb []byte
)
rb, err = marshal(&c.reqBuf, r)
if err != nil {
return
}
pb, err = marshal(&c.argsBuf, p)
if err != nil {
return
}
if err = sendFrame(c.wr, c.packBuf[:], rb, pb); err != nil {
return
}
return c.wr.Flush()
}

func (c *pbClientCodec) ReadResponseHeader(r *Response) error {
return recvFrame(c.rr, r)
}

func (c *pbClientCodec) ReadResponseBody(b proto.Message) error {
return recvFrame(c.rr, b)
}

func (c *pbClientCodec) Close() error {
return c.rwc.Close()
}
Loading

0 comments on commit cd75861

Please sign in to comment.