Skip to content

Commit

Permalink
N chanrpc.Client <-> N chanrpc.Server
Browse files Browse the repository at this point in the history
  • Loading branch information
name5566 committed Jul 25, 2016
1 parent 83adb73 commit 8ea5616
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 31 deletions.
78 changes: 55 additions & 23 deletions chanrpc/chanrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type RetInfo struct {
}

type Client struct {
s *Server
chanSyncRet chan *RetInfo
chanAsynRet chan *RetInfo
s *Server
chanSyncRet chan *RetInfo
ChanAsynRet chan *RetInfo
pendingAsynCall int
}

func NewServer(l int) *Server {
Expand Down Expand Up @@ -151,14 +152,23 @@ func (s *Server) Close() {
}

// goroutine safe
func (s *Server) Open(chanAsynRet chan *RetInfo) *Client {
func (s *Server) Open(l int) *Client {
c := NewClient(l)
c.Attach(s)
return c
}

func NewClient(l int) *Client {
c := new(Client)
c.s = s
c.chanSyncRet = make(chan *RetInfo, 1)
c.chanAsynRet = chanAsynRet
c.ChanAsynRet = make(chan *RetInfo, l)
return c
}

func (c *Client) Attach(s *Server) {
c.s = s
}

func (c *Client) call(ci *CallInfo, block bool) (err error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -179,6 +189,11 @@ func (c *Client) call(ci *CallInfo, block bool) (err error) {
}

func (c *Client) f(id interface{}, n int) (f interface{}, err error) {
if c.s == nil {
err = errors.New("server not attached")
return
}

f = c.s.functions[id]
if f == nil {
err = fmt.Errorf("function id %v: function not registered", id)
Expand Down Expand Up @@ -263,48 +278,54 @@ func (c *Client) CallN(id interface{}, args ...interface{}) ([]interface{}, erro
func (c *Client) asynCall(id interface{}, args []interface{}, cb interface{}, n int) {
f, err := c.f(id, n)
if err != nil {
c.chanAsynRet <- &RetInfo{err: err, cb: cb}
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}

err = c.call(&CallInfo{
f: f,
args: args,
chanRet: c.chanAsynRet,
chanRet: c.ChanAsynRet,
cb: cb,
}, false)
if err != nil {
c.chanAsynRet <- &RetInfo{err: err, cb: cb}
c.ChanAsynRet <- &RetInfo{err: err, cb: cb}
return
}
}

func (c *Client) AsynCall(id interface{}, _args ...interface{}) {
func (c *Client) AsynCall(id interface{}, _args ...interface{}) error {
if len(_args) < 1 {
panic("callback function not found")
}

// args
var args []interface{}
if len(_args) > 1 {
args = _args[:len(_args)-1]
return errors.New("callback function not found")
}

// cb
args := _args[:len(_args)-1]
cb := _args[len(_args)-1]

var n int
switch cb.(type) {
case func(error):
c.asynCall(id, args, cb, 0)
n = 0
case func(interface{}, error):
c.asynCall(id, args, cb, 1)
n = 1
case func([]interface{}, error):
c.asynCall(id, args, cb, 2)
n = 2
default:
panic("definition of callback function is invalid")
return errors.New("definition of callback function is invalid")
}

// too many calls
if c.pendingAsynCall >= cap(c.ChanAsynRet) {
err := execCb(&RetInfo{err: errors.New("too many calls"), cb: cb})
return err
}

c.asynCall(id, args, cb, n)
c.pendingAsynCall++
return nil
}

func ExecCb(ri *RetInfo) (err error) {
func execCb(ri *RetInfo) (err error) {
defer func() {
if r := recover(); r != nil {
if conf.LenStackBuf > 0 {
Expand All @@ -330,3 +351,14 @@ func ExecCb(ri *RetInfo) (err error) {
}
return
}

func (c *Client) Cb(ri *RetInfo) error {
c.pendingAsynCall--
return execCb(ri)
}

func (c *Client) Close() {
for c.pendingAsynCall > 0 {
c.Cb(<-c.ChanAsynRet)
}
}
11 changes: 5 additions & 6 deletions chanrpc/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func Example() {

// goroutine 2
go func() {
chanAsynRet := make(chan *chanrpc.RetInfo, 10)
c := s.Open(chanAsynRet)
c := s.Open(10)

// sync
err := c.Call0("f0")
Expand Down Expand Up @@ -108,10 +107,10 @@ func Example() {
}
})

chanrpc.ExecCb(<-chanAsynRet)
chanrpc.ExecCb(<-chanAsynRet)
chanrpc.ExecCb(<-chanAsynRet)
chanrpc.ExecCb(<-chanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)
c.Cb(<-c.ChanAsynRet)

// go
s.Go("f0")
Expand Down
2 changes: 1 addition & 1 deletion console/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *ExternalCommand) run(_args []string) string {
args[i] = v
}

ret, err := c.server.Open(nil).Call1(c._name, args...)
ret, err := c.server.Open(0).Call1(c._name, args...)
if err != nil {
return err.Error()
}
Expand Down
2 changes: 1 addition & 1 deletion gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (a *agent) Run() {

func (a *agent) OnClose() {
if a.gate.AgentChanRPC != nil {
err := a.gate.AgentChanRPC.Open(nil).Call0("CloseAgent", a)
err := a.gate.AgentChanRPC.Open(0).Call0("CloseAgent", a)
if err != nil {
log.Error("chanrpc error: %v", err)
}
Expand Down

0 comments on commit 8ea5616

Please sign in to comment.