Skip to content

Commit

Permalink
Support setting routing delegate
Browse files Browse the repository at this point in the history
Modify the ContextBuilder to support setting the `rd` transport header.
This involves a few changes, but they're all covered by the added tests.
  • Loading branch information
Akshay Shah committed Jan 29, 2016
1 parent 455648d commit faae8f4
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 2 deletions.
7 changes: 7 additions & 0 deletions calloptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type CallOptions struct {

// RequestState stores request state across retry attempts.
RequestState *RequestState

// RoutingDelegate identifies a service capable of routing a request to its
// intended recipient.
RoutingDelegate string
}

var defaultCallOptions = &CallOptions{}
Expand All @@ -63,6 +67,9 @@ func (c *CallOptions) overrideHeaders(headers transportHeaders) {
if c.ShardKey != "" {
headers[ShardKey] = c.ShardKey
}
if c.RoutingDelegate != "" {
headers[RoutingDelegate] = c.RoutingDelegate
}
}

// setResponseHeaders copies some headers from the incoming call request to the response.
Expand Down
12 changes: 11 additions & 1 deletion calloptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
func TestSetHeaders(t *testing.T) {
tests := []struct {
format Format
routingDelegate string
expectedHeaders transportHeaders
}{
{
Expand All @@ -40,11 +41,20 @@ func TestSetHeaders(t *testing.T) {
format: Thrift,
expectedHeaders: transportHeaders{ArgScheme: Thrift.String()},
},
{
format: JSON,
routingDelegate: "xpr",
expectedHeaders: transportHeaders{
ArgScheme: JSON.String(),
RoutingDelegate: "xpr",
},
},
}

for _, tt := range tests {
callOpts := &CallOptions{
Format: tt.format,
Format: tt.format,
RoutingDelegate: tt.routingDelegate,
}
headers := make(transportHeaders)
callOpts.setHeaders(headers)
Expand Down
4 changes: 4 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type IncomingCall interface {
// ShardKey returns the shard key from the ShardKey transport header.
ShardKey() string

// RoutingDelegate returns the routing delegate from RoutingDelegate
// transport header.
RoutingDelegate() string

// RemotePeer returns the caller's peer information.
// If the caller is an ephemeral peer, then the HostPort cannot be used to make new
// connections to the caller.
Expand Down
8 changes: 8 additions & 0 deletions context_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func (cb *ContextBuilder) SetFormat(f Format) *ContextBuilder {
return cb
}

func (cb *ContextBuilder) SetRoutingDelegate(rd string) *ContextBuilder {
if cb.CallOptions == nil {
cb.CallOptions = new(CallOptions)
}
cb.CallOptions.RoutingDelegate = rd
return cb
}

// DisableTracing disables tracing.
func (cb *ContextBuilder) DisableTracing() *ContextBuilder {
cb.TracingDisabled = true
Expand Down
23 changes: 23 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,29 @@ func TestNewContextTimeoutZero(t *testing.T) {
assert.True(t, deadline.Sub(time.Now()) <= 0, "Deadline should be Now or earlier")
}

func TestRoutingDelegatePropagates(t *testing.T) {
WithVerifiedServer(t, nil, func(ch *Channel, hostPort string) {
peerInfo := ch.PeerInfo()
testutils.RegisterFunc(ch, "test", func(ctx context.Context, args *raw.Args) (*raw.Res, error) {
return &raw.Res{
Arg3: []byte(CurrentCall(ctx).RoutingDelegate()),
}, nil
})

ctx, cancel := NewContextBuilder(time.Second).Build()
defer cancel()
_, arg3, _, err := raw.Call(ctx, ch, peerInfo.HostPort, peerInfo.ServiceName, "test", nil, nil)
assert.NoError(t, err, "Call failed")
assert.Equal(t, []byte(""), arg3, "Expected no routing delegate header")

ctx, cancel = NewContextBuilder(time.Second).SetRoutingDelegate("xpr").Build()
defer cancel()
_, arg3, _, err = raw.Call(ctx, ch, peerInfo.HostPort, peerInfo.ServiceName, "test", nil, nil)
assert.NoError(t, err, "Call failed")
assert.Equal(t, "xpr", string(arg3), "Expected routing delegate header to be set")
})
}

func TestShardKeyPropagates(t *testing.T) {
WithVerifiedServer(t, nil, func(ch *Channel, hostPort string) {
peerInfo := ch.PeerInfo()
Expand Down
5 changes: 5 additions & 0 deletions inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ func (call *InboundCall) ShardKey() string {
return call.headers[ShardKey]
}

// RoutingDelegate returns the routing delegate from the RoutingDelegate transport header.
func (call *InboundCall) RoutingDelegate() string {
return call.headers[RoutingDelegate]
}

// RemotePeer returns the caller's peer info.
func (call *InboundCall) RemotePeer() PeerInfo {
return call.conn.RemotePeerInfo()
Expand Down
4 changes: 4 additions & 0 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ const (

// SpeculativeExecution header specifies the number of nodes on which to run the request.
SpeculativeExecution TransportHeaderName = "se"

// RoutingDelegate header identifies an intermediate service which knows
// how to route the request to the intended recipient.
RoutingDelegate TransportHeaderName = "rd"
)

// transportHeaders are passed as part of a CallReq/CallRes
Expand Down
10 changes: 9 additions & 1 deletion testutils/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ type FakeIncomingCall struct {
// ShardKeyF is the intended destination for this call.
ShardKeyF string

// RemotePeer is the calling service's peer info.
// RemotePeerF is the calling service's peer info.
RemotePeerF tchannel.PeerInfo

// RoutingDelegateF is the routing delegate.
RoutingDelegateF string
}

// CallerName returns the caller name as specified in the fake call.
Expand All @@ -48,6 +51,11 @@ func (f *FakeIncomingCall) ShardKey() string {
return f.ShardKeyF
}

// RoutingDelegate returns the routing delegate as specified in the fake call.
func (f *FakeIncomingCall) RoutingDelegate() string {
return f.RoutingDelegateF
}

// RemotePeer returns the caller's peer info.
func (f *FakeIncomingCall) RemotePeer() tchannel.PeerInfo {
return f.RemotePeerF
Expand Down

0 comments on commit faae8f4

Please sign in to comment.