Skip to content

Commit

Permalink
rpc: implement full bi-directional communication (ethereum#18471)
Browse files Browse the repository at this point in the history
New APIs added:

    client.RegisterName(namespace, service) // makes service available to server
    client.Notify(ctx, method, args...)     // sends a notification
    ClientFromContext(ctx)                  // to get a client in handler method

This is essentially a rewrite of the server-side code. JSON-RPC
processing code is now the same on both server and client side. Many
minor issues were fixed in the process and there is a new test suite for
JSON-RPC spec compliance (and non-compliance in some cases).

List of behavior changes:

- Method handlers are now called with a per-request context instead of a
  per-connection context. The context is canceled right after the method
  returns.
- Subscription error channels are always closed when the connection
  ends. There is no need to also wait on the Notifier's Closed channel
  to detect whether the subscription has ended.
- Client now omits "params" instead of sending "params": null when there
  are no arguments to a call. The previous behavior was not compliant
  with the spec. The server still accepts "params": null.
- Floating point numbers are allowed as "id". The spec doesn't allow
  them, but we handle request "id" as json.RawMessage and guarantee that
  the same number will be sent back.
- Logging is improved significantly. There is now a message at DEBUG
  level for each RPC call served.
  • Loading branch information
fjl authored Feb 4, 2019
1 parent ec3432b commit 245f314
Show file tree
Hide file tree
Showing 36 changed files with 2,149 additions and 2,107 deletions.
548 changes: 187 additions & 361 deletions rpc/client.go

Large diffs are not rendered by default.

145 changes: 60 additions & 85 deletions rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ import (
)

func TestClientRequest(t *testing.T) {
server := newTestServer("service", new(Service))
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()

var resp Result
if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil {
if err := client.Call(&resp, "test_echo", "hello", 10, &Args{"world"}); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
Expand All @@ -50,19 +50,19 @@ func TestClientRequest(t *testing.T) {
}

func TestClientBatchRequest(t *testing.T) {
server := newTestServer("service", new(Service))
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()

batch := []BatchElem{
{
Method: "service_echo",
Method: "test_echo",
Args: []interface{}{"hello", 10, &Args{"world"}},
Result: new(Result),
},
{
Method: "service_echo",
Method: "test_echo",
Args: []interface{}{"hello2", 11, &Args{"world"}},
Result: new(Result),
},
Expand All @@ -77,27 +77,38 @@ func TestClientBatchRequest(t *testing.T) {
}
wantResult := []BatchElem{
{
Method: "service_echo",
Method: "test_echo",
Args: []interface{}{"hello", 10, &Args{"world"}},
Result: &Result{"hello", 10, &Args{"world"}},
},
{
Method: "service_echo",
Method: "test_echo",
Args: []interface{}{"hello2", 11, &Args{"world"}},
Result: &Result{"hello2", 11, &Args{"world"}},
},
{
Method: "no_such_method",
Args: []interface{}{1, 2, 3},
Result: new(int),
Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"},
Error: &jsonError{Code: -32601, Message: "the method no_such_method does not exist/is not available"},
},
}
if !reflect.DeepEqual(batch, wantResult) {
t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
}
}

func TestClientNotify(t *testing.T) {
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()

if err := client.Notify(context.Background(), "test_echo", "hello", 10, &Args{"world"}); err != nil {
t.Fatal(err)
}
}

// func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
Expand All @@ -106,7 +117,12 @@ func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
// This test checks that requests made through CallContext can be canceled by canceling
// the context.
func testClientCancel(transport string, t *testing.T) {
server := newTestServer("service", new(Service))
// These tests take a lot of time, run them all at once.
// You probably want to run with -parallel 1 or comment out
// the call to t.Parallel if you enable the logging.
t.Parallel()

server := newTestServer()
defer server.Stop()

// What we want to achieve is that the context gets canceled
Expand Down Expand Up @@ -142,11 +158,6 @@ func testClientCancel(transport string, t *testing.T) {
panic("unknown transport: " + transport)
}

// These tests take a lot of time, run them all at once.
// You probably want to run with -parallel 1 or comment out
// the call to t.Parallel if you enable the logging.
t.Parallel()

// The actual test starts here.
var (
wg sync.WaitGroup
Expand Down Expand Up @@ -174,7 +185,8 @@ func testClientCancel(transport string, t *testing.T) {
}
// Now perform a call with the context.
// The key thing here is that no call will ever complete successfully.
err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout)
sleepTime := maxContextCancelTimeout + 20*time.Millisecond
err := client.CallContext(ctx, nil, "test_sleep", sleepTime)
if err != nil {
log.Debug(fmt.Sprint("got expected error:", err))
} else {
Expand All @@ -191,7 +203,7 @@ func testClientCancel(transport string, t *testing.T) {
}

func TestClientSubscribeInvalidArg(t *testing.T) {
server := newTestServer("service", new(Service))
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()
Expand Down Expand Up @@ -221,46 +233,14 @@ func TestClientSubscribeInvalidArg(t *testing.T) {
}

func TestClientSubscribe(t *testing.T) {
server := newTestServer("eth", new(NotificationTestService))
defer server.Stop()
client := DialInProc(server)
defer client.Close()

nc := make(chan int)
count := 10
sub, err := client.EthSubscribe(context.Background(), nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
for i := 0; i < count; i++ {
if val := <-nc; val != i {
t.Fatalf("value mismatch: got %d, want %d", val, i)
}
}

sub.Unsubscribe()
select {
case v := <-nc:
t.Fatal("received value after unsubscribe:", v)
case err := <-sub.Err():
if err != nil {
t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
}
case <-time.After(1 * time.Second):
t.Fatalf("subscription not closed within 1s after unsubscribe")
}
}

func TestClientSubscribeCustomNamespace(t *testing.T) {
namespace := "custom"
server := newTestServer(namespace, new(NotificationTestService))
server := newTestServer()
defer server.Stop()
client := DialInProc(server)
defer client.Close()

nc := make(chan int)
count := 10
sub, err := client.Subscribe(context.Background(), namespace, nc, "someSubscription", count, 0)
sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
Expand All @@ -283,14 +263,17 @@ func TestClientSubscribeCustomNamespace(t *testing.T) {
}
}

// In this test, the connection drops while EthSubscribe is
// waiting for a response.
// In this test, the connection drops while Subscribe is waiting for a response.
func TestClientSubscribeClose(t *testing.T) {
service := &NotificationTestService{
server := newTestServer()
service := &notificationTestService{
gotHangSubscriptionReq: make(chan struct{}),
unblockHangSubscription: make(chan struct{}),
}
server := newTestServer("eth", service)
if err := server.RegisterName("nftest2", service); err != nil {
t.Fatal(err)
}

defer server.Stop()
client := DialInProc(server)
defer client.Close()
Expand All @@ -302,7 +285,7 @@ func TestClientSubscribeClose(t *testing.T) {
err error
)
go func() {
sub, err = client.EthSubscribe(context.Background(), nc, "hangSubscription", 999)
sub, err = client.Subscribe(context.Background(), "nftest2", nc, "hangSubscription", 999)
errc <- err
}()

Expand All @@ -313,27 +296,26 @@ func TestClientSubscribeClose(t *testing.T) {
select {
case err := <-errc:
if err == nil {
t.Errorf("EthSubscribe returned nil error after Close")
t.Errorf("Subscribe returned nil error after Close")
}
if sub != nil {
t.Error("EthSubscribe returned non-nil subscription after Close")
t.Error("Subscribe returned non-nil subscription after Close")
}
case <-time.After(1 * time.Second):
t.Fatalf("EthSubscribe did not return within 1s after Close")
t.Fatalf("Subscribe did not return within 1s after Close")
}
}

// This test reproduces https://github.com/ethereum/go-ethereum/issues/17837 where the
// client hangs during shutdown when Unsubscribe races with Client.Close.
func TestClientCloseUnsubscribeRace(t *testing.T) {
service := &NotificationTestService{}
server := newTestServer("eth", service)
server := newTestServer()
defer server.Stop()

for i := 0; i < 20; i++ {
client := DialInProc(server)
nc := make(chan int)
sub, err := client.EthSubscribe(context.Background(), nc, "someSubscription", 3, 1)
sub, err := client.Subscribe(context.Background(), "nftest", nc, "someSubscription", 3, 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -350,7 +332,7 @@ func TestClientCloseUnsubscribeRace(t *testing.T) {
// This test checks that Client doesn't lock up when a single subscriber
// doesn't read subscription events.
func TestClientNotificationStorm(t *testing.T) {
server := newTestServer("eth", new(NotificationTestService))
server := newTestServer()
defer server.Stop()

doTest := func(count int, wantError bool) {
Expand All @@ -362,7 +344,7 @@ func TestClientNotificationStorm(t *testing.T) {
// Subscribe on the server. It will start sending many notifications
// very quickly.
nc := make(chan int)
sub, err := client.EthSubscribe(ctx, nc, "someSubscription", count, 0)
sub, err := client.Subscribe(ctx, "nftest", nc, "someSubscription", count, 0)
if err != nil {
t.Fatal("can't subscribe:", err)
}
Expand All @@ -384,7 +366,7 @@ func TestClientNotificationStorm(t *testing.T) {
return
}
var r int
err := client.CallContext(ctx, &r, "eth_echo", i)
err := client.CallContext(ctx, &r, "nftest_echo", i)
if err != nil {
if !wantError {
t.Fatalf("(%d/%d) call error: %v", i, count, err)
Expand All @@ -399,7 +381,7 @@ func TestClientNotificationStorm(t *testing.T) {
}

func TestClientHTTP(t *testing.T) {
server := newTestServer("service", new(Service))
server := newTestServer()
defer server.Stop()

client, hs := httpTestClient(server, "http", nil)
Expand All @@ -416,7 +398,7 @@ func TestClientHTTP(t *testing.T) {
for i := range results {
i := i
go func() {
errc <- client.Call(&results[i], "service_echo",
errc <- client.Call(&results[i], "test_echo",
wantResult.String, wantResult.Int, wantResult.Args)
}()
}
Expand Down Expand Up @@ -445,16 +427,16 @@ func TestClientHTTP(t *testing.T) {

func TestClientReconnect(t *testing.T) {
startServer := func(addr string) (*Server, net.Listener) {
srv := newTestServer("service", new(Service))
srv := newTestServer()
l, err := net.Listen("tcp", addr)
if err != nil {
t.Fatal(err)
t.Fatal("can't listen:", err)
}
go http.Serve(l, srv.WebsocketHandler([]string{"*"}))
return srv, l
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

// Start a server and corresponding client.
Expand All @@ -466,21 +448,22 @@ func TestClientReconnect(t *testing.T) {

// Perform a call. This should work because the server is up.
var resp Result
if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil {
if err := client.CallContext(ctx, &resp, "test_echo", "", 1, nil); err != nil {
t.Fatal(err)
}

// Shut down the server and try calling again. It shouldn't work.
// Shut down the server and allow for some cool down time so we can listen on the same
// address again.
l1.Close()
s1.Stop()
if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil {
time.Sleep(2 * time.Second)

// Try calling again. It shouldn't work.
if err := client.CallContext(ctx, &resp, "test_echo", "", 2, nil); err == nil {
t.Error("successful call while the server is down")
t.Logf("resp: %#v", resp)
}

// Allow for some cool down time so we can listen on the same address again.
time.Sleep(2 * time.Second)

// Start it up again and call again. The connection should be reestablished.
// We spawn multiple calls here to check whether this hangs somehow.
s2, l2 := startServer(l1.Addr().String())
Expand All @@ -493,7 +476,7 @@ func TestClientReconnect(t *testing.T) {
go func() {
<-start
var resp Result
errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil)
errors <- client.CallContext(ctx, &resp, "test_echo", "", 3, nil)
}()
}
close(start)
Expand All @@ -503,20 +486,12 @@ func TestClientReconnect(t *testing.T) {
errcount++
}
}
t.Log("err:", err)
t.Logf("%d errors, last error: %v", errcount, err)
if errcount > 1 {
t.Errorf("expected one error after disconnect, got %d", errcount)
}
}

func newTestServer(serviceName string, service interface{}) *Server {
server := NewServer()
if err := server.RegisterName(serviceName, service); err != nil {
panic(err)
}
return server
}

func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
// Create the HTTP server.
var hs *httptest.Server
Expand Down
Loading

0 comments on commit 245f314

Please sign in to comment.