Skip to content

Commit

Permalink
client interface, test client errs
Browse files Browse the repository at this point in the history
  • Loading branch information
mchmarny committed Jun 22, 2020
1 parent 5c2c2d4 commit c9110e4
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 75 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod: ## Updates the go modules

test: mod ## Tests the entire project
go test -v -count=1 -race ./...
# go test -v -count=1 -run TestInvokeServiceWithContent ./...
# go test -v -count=1 -run NameOfSingleTest ./...

service: mod ## Runs the uncompiled example service code
dapr run --app-id serving \
Expand Down
4 changes: 2 additions & 2 deletions client/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

// InvokeBinding invokes specific operation on the configured Dapr binding.
// This method covers input, output, and bi-directional bindings.
func (c *Client) InvokeBinding(ctx context.Context, name, op string, in []byte, min map[string]string) (out []byte, mout map[string]string, err error) {
func (c *GRPCClient) InvokeBinding(ctx context.Context, name, op string, in []byte, min map[string]string) (out []byte, mout map[string]string, err error) {
if name == "" {
return nil, nil, errors.New("nil topic")
}
Expand All @@ -35,7 +35,7 @@ func (c *Client) InvokeBinding(ctx context.Context, name, op string, in []byte,

// InvokeOutputBinding invokes configured Dapr binding with data (allows nil).InvokeOutputBinding
// This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method.
func (c *Client) InvokeOutputBinding(ctx context.Context, name, operation string, data []byte) error {
func (c *GRPCClient) InvokeOutputBinding(ctx context.Context, name, operation string, data []byte) error {
_, _, err := c.InvokeBinding(ctx, name, operation, data, nil)
if err != nil {
return errors.Wrap(err, "error invoking output binding")
Expand Down
4 changes: 2 additions & 2 deletions client/binding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestInvokeBinding(t *testing.T) {
ctx := context.Background()
client, closer := getTestClient(ctx)
client, closer := getTestClient(ctx, t)
defer closer()

mIn := make(map[string]string, 0)
Expand All @@ -23,7 +23,7 @@ func TestInvokeBinding(t *testing.T) {

func TestInvokeOutputBinding(t *testing.T) {
ctx := context.Background()
client, closer := getTestClient(ctx)
client, closer := getTestClient(ctx, t)
defer closer()

err := client.InvokeOutputBinding(ctx, "serving", "EchoMethod", []byte("ping"))
Expand Down
66 changes: 57 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,59 @@ const (
)

var (
logger = log.New(os.Stdout, "", 0)
logger = log.New(os.Stdout, "", 0)
_ Client = (*GRPCClient)(nil)
)

// Client is the interface for Dapr client implementation.
type Client interface {
// InvokeBinding invokes specific operation on the configured Dapr binding.
// This method covers input, output, and bi-directional bindings.
InvokeBinding(ctx context.Context, name, op string, in []byte, min map[string]string) (out []byte, mout map[string]string, err error)

// InvokeOutputBinding invokes configured Dapr binding with data (allows nil).InvokeOutputBinding
// This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method.
InvokeOutputBinding(ctx context.Context, name, operation string, data []byte) error

// InvokeService invokes service without raw data ([]byte).
InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error)

// InvokeServiceWithContent invokes service without content (data + content type).
InvokeServiceWithContent(ctx context.Context, serviceID, method, contentType string, data []byte) (out []byte, err error)

// PublishEvent pubishes data onto specific pubsub topic.
PublishEvent(ctx context.Context, topic string, in []byte) error

// GetSecret retreaves preconfigred secret from specified store using key.
GetSecret(ctx context.Context, store, key string, meta map[string]string) (out map[string]string, err error)

// SaveState saves the fully loaded state to store.
SaveState(ctx context.Context, s *State) error

// SaveStateData saves the raw data into store using default state options.
SaveStateData(ctx context.Context, store, key, etag string, data []byte) error

// SaveStateItem saves the single state item to store.
SaveStateItem(ctx context.Context, store string, item *StateItem) error

// GetState retreaves state from specific store using default consistency option.
GetState(ctx context.Context, store, key string) (out []byte, etag string, err error)

// GetStateWithConsistency retreaves state from specific store using provided state consistency.
GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (out []byte, etag string, err error)

// DeleteState deletes content from store using default state options.
DeleteState(ctx context.Context, store, key string) error

// DeleteStateVersion deletes content from store using provided state options and etag.
DeleteStateVersion(ctx context.Context, store, key, etag string, opts *StateOptions) error

// Close cleans up all resources created by the client.
Close()
}

// NewClient instantiates Dapr client using DAPR_GRPC_PORT environment variable as port.
func NewClient() (client *Client, err error) {
func NewClient() (client Client, err error) {
port := os.Getenv(daprPortEnvVarName)
if port == "" {
port = daprPortDefault
Expand All @@ -32,15 +80,15 @@ func NewClient() (client *Client, err error) {
}

// NewClientWithPort instantiates Dapr using specific port.
func NewClientWithPort(port string) (client *Client, err error) {
func NewClientWithPort(port string) (client Client, err error) {
if port == "" {
return nil, errors.New("nil port")
}
return NewClientWithAddress(net.JoinHostPort("127.0.0.1", port))
}

// NewClientWithAddress instantiates Dapr using specific address (inclding port).
func NewClientWithAddress(address string) (client *Client, err error) {
func NewClientWithAddress(address string) (client Client, err error) {
if address == "" {
return nil, errors.New("nil address")
}
Expand All @@ -53,21 +101,21 @@ func NewClientWithAddress(address string) (client *Client, err error) {
}

// NewClientWithConnection instantiates Dapr client using specific connection.
func NewClientWithConnection(conn *grpc.ClientConn) *Client {
return &Client{
func NewClientWithConnection(conn *grpc.ClientConn) Client {
return &GRPCClient{
connection: conn,
protoClient: pb.NewDaprClient(conn),
}
}

// Client is the Dapr client.
type Client struct {
// GRPCClient is the gRPC implementation of Dapr client.
type GRPCClient struct {
connection *grpc.ClientConn
protoClient pb.DaprClient
}

// Close cleans up all resources created by the client.
func (c *Client) Close() {
func (c *GRPCClient) Close() {
if c.connection != nil {
c.connection.Close()
}
Expand Down
21 changes: 12 additions & 9 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"net"
"testing"
"time"

"github.com/golang/protobuf/ptypes/empty"
"github.com/stretchr/testify/assert"
Expand All @@ -16,9 +15,13 @@ import (
pb "github.com/dapr/go-sdk/dapr/proto/runtime/v1"
)

const (
testBufSize = 1024 * 1024
)

func TestNewClientWithConnection(t *testing.T) {
ctx := context.Background()
client, closer := getTestClient(ctx)
client, closer := getTestClient(ctx, t)
assert.NotNil(t, closer)
defer closer()
assert.NotNil(t, client)
Expand All @@ -31,8 +34,8 @@ func TestNewClientWithoutArgs(t *testing.T) {
assert.NotNil(t, err)
}

func getTestClient(ctx context.Context) (client *Client, closer func()) {
l := bufconn.Listen(1024 * 1024)
func getTestClient(ctx context.Context, t *testing.T) (client Client, closer func()) {
l := bufconn.Listen(testBufSize)
s := grpc.NewServer()

server := &testDaprServer{
Expand All @@ -43,18 +46,18 @@ func getTestClient(ctx context.Context) (client *Client, closer func()) {

go func() {
if err := s.Serve(l); err != nil {
logger.Fatalf("error starting test server: %s", err)
t.Fatalf("test server exited with error: %v", err)
}
}()

// wait for the server to start
time.Sleep(100 * time.Millisecond)

d := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return l.Dial()
})

c, _ := grpc.DialContext(ctx, "", d, grpc.WithInsecure())
c, err := grpc.DialContext(ctx, "bufnet", d, grpc.WithInsecure())
if err != nil {
t.Fatalf("failed to dial bufnet: %v", err)
}

closer = func() {
l.Close()
Expand Down
6 changes: 3 additions & 3 deletions client/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/pkg/errors"
)

func (c *Client) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
if req == nil {
return nil, errors.New("nil request")
}
Expand All @@ -30,7 +30,7 @@ func (c *Client) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeSer
}

// InvokeService invokes service without raw data ([]byte).
func (c *Client) InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error) {
func (c *GRPCClient) InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error) {
if serviceID == "" {
return nil, errors.New("nil serviceID")
}
Expand All @@ -47,7 +47,7 @@ func (c *Client) InvokeService(ctx context.Context, serviceID, method string) (o
}

// InvokeServiceWithContent invokes service without content (data + content type).
func (c *Client) InvokeServiceWithContent(ctx context.Context, serviceID, method, contentType string, data []byte) (out []byte, err error) {
func (c *GRPCClient) InvokeServiceWithContent(ctx context.Context, serviceID, method, contentType string, data []byte) (out []byte, err error) {
if serviceID == "" {
return nil, errors.New("nil serviceID")
}
Expand Down
4 changes: 2 additions & 2 deletions client/invoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestInvokeServiceWithContent(t *testing.T) {
ctx := context.Background()
client, closer := getTestClient(ctx)
client, closer := getTestClient(ctx, t)
defer closer()

resp, err := client.InvokeServiceWithContent(ctx, "serving", "EchoMethod",
Expand All @@ -21,7 +21,7 @@ func TestInvokeServiceWithContent(t *testing.T) {

func TestInvokeService(t *testing.T) {
ctx := context.Background()
client, closer := getTestClient(ctx)
client, closer := getTestClient(ctx, t)
defer closer()

resp, err := client.InvokeService(ctx, "serving", "EchoMethod")
Expand Down
2 changes: 1 addition & 1 deletion client/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// PublishEvent pubishes data onto specific pubsub topic.
func (c *Client) PublishEvent(ctx context.Context, topic string, in []byte) error {
func (c *GRPCClient) PublishEvent(ctx context.Context, topic string, in []byte) error {
if topic == "" {
return errors.New("nil topic")
}
Expand Down
2 changes: 1 addition & 1 deletion client/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestPublishEvent(t *testing.T) {
ctx := context.Background()
client, closer := getTestClient(ctx)
client, closer := getTestClient(ctx, t)
defer closer()

err := client.PublishEvent(ctx, "serving", []byte("ping"))
Expand Down
2 changes: 1 addition & 1 deletion client/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// GetSecret retreaves preconfigred secret from specified store using key.
func (c *Client) GetSecret(ctx context.Context, store, key string, meta map[string]string) (out map[string]string, err error) {
func (c *GRPCClient) GetSecret(ctx context.Context, store, key string, meta map[string]string) (out map[string]string, err error) {
if store == "" {
return nil, errors.New("nil store")
}
Expand Down
2 changes: 1 addition & 1 deletion client/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func GetSecret(t *testing.T) {
ctx := context.Background()
client, closer := getTestClient(ctx)
client, closer := getTestClient(ctx, t)
defer closer()

out, err := client.GetSecret(ctx, "store", "key1", nil)
Expand Down
54 changes: 27 additions & 27 deletions client/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func toProtoDuration(d time.Duration) *duration.Duration {
}

// SaveState saves the fully loaded state to store.
func (c *Client) SaveState(ctx context.Context, s *State) error {
func (c *GRPCClient) SaveState(ctx context.Context, s *State) error {
if s == nil || s.StoreName == "" || s.States == nil || len(s.States) < 1 {
return errors.New("nil or invalid state")
}
Expand All @@ -187,48 +187,53 @@ func (c *Client) SaveState(ctx context.Context, s *State) error {
return nil
}

// SaveStateItem saves the single state item to store.
func (c *Client) SaveStateItem(ctx context.Context, store string, item *StateItem) error {
// SaveStateData saves the raw data into store using default state options.
func (c *GRPCClient) SaveStateData(ctx context.Context, store, key, etag string, data []byte) error {
if store == "" {
return errors.New("nil store")
}
if item == nil {
return errors.New("nil item")
if key == "" {
return errors.New("nil key")
}

req := &State{
StoreName: store,
States: []*StateItem{item},
States: []*StateItem{
{
Key: key,
Value: data,
Etag: etag,
},
},
}

return c.SaveState(ctx, req)
}

// SaveStateData saves the raw data into store using default state options.
func (c *Client) SaveStateData(ctx context.Context, store, key, etag string, data []byte) error {
// SaveStateItem saves the single state item to store.
func (c *GRPCClient) SaveStateItem(ctx context.Context, store string, item *StateItem) error {
if store == "" {
return errors.New("nil store")
}
if key == "" {
return errors.New("nil key")
if item == nil {
return errors.New("nil item")
}

req := &State{
StoreName: store,
States: []*StateItem{
{
Key: key,
Value: data,
Etag: etag,
},
},
States: []*StateItem{item},
}

return c.SaveState(ctx, req)
}

// GetState retreaves state from specific store using default consistency option.
func (c *GRPCClient) GetState(ctx context.Context, store, key string) (out []byte, etag string, err error) {
return c.GetStateWithConsistency(ctx, store, key, StateConsistencyStrong)
}

// GetStateWithConsistency retreaves state from specific store using provided state consistency.
func (c *Client) GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (out []byte, etag string, err error) {
func (c *GRPCClient) GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (out []byte, etag string, err error) {
if store == "" {
return nil, "", errors.New("nil store")
}
Expand All @@ -250,13 +255,13 @@ func (c *Client) GetStateWithConsistency(ctx context.Context, store, key string,
return result.Data, result.Etag, nil
}

// GetState retreaves state from specific store using default consistency option.
func (c *Client) GetState(ctx context.Context, store, key string) (out []byte, etag string, err error) {
return c.GetStateWithConsistency(ctx, store, key, StateConsistencyStrong)
// DeleteState deletes content from store using default state options.
func (c *GRPCClient) DeleteState(ctx context.Context, store, key string) error {
return c.DeleteStateVersion(ctx, store, key, "", nil)
}

// DeleteStateVersion deletes content from store using provided state options and etag.
func (c *Client) DeleteStateVersion(ctx context.Context, store, key, etag string, opts *StateOptions) error {
func (c *GRPCClient) DeleteStateVersion(ctx context.Context, store, key, etag string, opts *StateOptions) error {
if store == "" {
return errors.New("nil store")
}
Expand All @@ -278,8 +283,3 @@ func (c *Client) DeleteStateVersion(ctx context.Context, store, key, etag string

return nil
}

// DeleteState deletes content from store using default state options.
func (c *Client) DeleteState(ctx context.Context, store, key string) error {
return c.DeleteStateVersion(ctx, store, key, "", nil)
}
Loading

0 comments on commit c9110e4

Please sign in to comment.