forked from dapr/go-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
260 lines (208 loc) · 9.7 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package client
import (
"context"
"log"
"net"
"os"
"sync"
"time"
"github.com/dapr/go-sdk/actor"
"github.com/dapr/go-sdk/actor/config"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/emptypb"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
// used to import codec implements.
_ "github.com/dapr/go-sdk/actor/codec/impl"
)
const (
daprPortDefault = "50001"
daprPortEnvVarName = "DAPR_GRPC_PORT"
traceparentKey = "traceparent"
apiTokenKey = "dapr-api-token" /* #nosec */
apiTokenEnvVarName = "DAPR_API_TOKEN" /* #nosec */
)
var (
logger = log.New(os.Stdout, "", 0)
_ Client = (*GRPCClient)(nil)
defaultClient Client
doOnce sync.Once
)
// Client is the interface for Dapr client implementation.
type Client interface {
// InvokeBinding invokes specific operation on the configured Dapr binding.
// This method covers input, output, and bi-directional bindings.
InvokeBinding(ctx context.Context, in *InvokeBindingRequest) (out *BindingEvent, err error)
// InvokeOutputBinding invokes configured Dapr binding with data.InvokeOutputBinding
// This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method.
InvokeOutputBinding(ctx context.Context, in *InvokeBindingRequest) error
// InvokeMethod invokes service without raw data
InvokeMethod(ctx context.Context, appID, methodName, verb string) (out []byte, err error)
// InvokeMethodWithContent invokes service with content
InvokeMethodWithContent(ctx context.Context, appID, methodName, verb string, content *DataContent) (out []byte, err error)
// InvokeMethodWithCustomContent invokes app with custom content (struct + content type).
InvokeMethodWithCustomContent(ctx context.Context, appID, methodName, verb string, contentType string, content interface{}) (out []byte, err error)
// PublishEvent publishes data onto topic in specific pubsub component.
PublishEvent(ctx context.Context, pubsubName, topicName string, data interface{}, opts ...PublishEventOption) error
// PublishEventfromCustomContent serializes an struct and publishes its contents as data (JSON) onto topic in specific pubsub component.
// Deprecated: This method is deprecated and will be removed in a future version of the SDK. Please use `PublishEvent` instead.
PublishEventfromCustomContent(ctx context.Context, pubsubName, topicName string, data interface{}) error
// GetSecret retrieves preconfigured secret from specified store using key.
GetSecret(ctx context.Context, storeName, key string, meta map[string]string) (data map[string]string, err error)
// GetBulkSecret retrieves all preconfigured secrets for this application.
GetBulkSecret(ctx context.Context, storeName string, meta map[string]string) (data map[string]map[string]string, err error)
// SaveState saves the raw data into store using default state options.
SaveState(ctx context.Context, storeName, key string, data []byte, so ...StateOption) error
// SaveBulkState saves multiple state item to store with specified options.
SaveBulkState(ctx context.Context, storeName string, items ...*SetStateItem) error
// GetState retrieves state from specific store using default consistency option.
GetState(ctx context.Context, storeName, key string) (item *StateItem, err error)
// GetStateWithConsistency retrieves state from specific store using provided state consistency.
GetStateWithConsistency(ctx context.Context, storeName, key string, meta map[string]string, sc StateConsistency) (item *StateItem, err error)
// GetBulkState retrieves state for multiple keys from specific store.
GetBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string, parallelism int32) ([]*BulkStateItem, error)
// DeleteState deletes content from store using default state options.
DeleteState(ctx context.Context, storeName, key string) error
// DeleteStateWithETag deletes content from store using provided state options and etag.
DeleteStateWithETag(ctx context.Context, storeName, key string, etag *ETag, meta map[string]string, opts *StateOptions) error
// ExecuteStateTransaction provides way to execute multiple operations on a specified store.
ExecuteStateTransaction(ctx context.Context, storeName string, meta map[string]string, ops []*StateOperation) error
// DeleteBulkState deletes content for multiple keys from store.
DeleteBulkState(ctx context.Context, storeName string, keys []string) error
// DeleteBulkState deletes content for multiple keys from store.
DeleteBulkStateItems(ctx context.Context, storeName string, items []*DeleteStateItem) error
// Shutdown the sidecar.
Shutdown(ctx context.Context) error
// WithTraceID adds existing trace ID to the outgoing context.
WithTraceID(ctx context.Context, id string) context.Context
// WithAuthToken sets Dapr API token on the instantiated client.
WithAuthToken(token string)
// Close cleans up all resources created by the client.
Close()
// RegisterActorTimer registers an actor timer.
RegisterActorTimer(ctx context.Context, req *RegisterActorTimerRequest) error
// UnregisterActorTimer unregisters an actor timer.
UnregisterActorTimer(ctx context.Context, req *UnregisterActorTimerRequest) error
// RegisterActorReminder registers an actor reminder.
RegisterActorReminder(ctx context.Context, req *RegisterActorReminderRequest) error
// UnregisterActorReminder unregisters an actor reminder.
UnregisterActorReminder(ctx context.Context, req *UnregisterActorReminderRequest) error
// InvokeActor calls a method on an actor.
InvokeActor(ctx context.Context, req *InvokeActorRequest) (*InvokeActorResponse, error)
// GetActorState get actor state
GetActorState(ctx context.Context, req *GetActorStateRequest) (data *GetActorStateResponse, err error)
// SaveStateTransactionally save actor state
SaveStateTransactionally(ctx context.Context, actorType, actorID string, operations []*ActorStateOperation) error
// ImplActorClientStub is to impl user defined actor client stub
ImplActorClientStub(actorClientStub actor.Client, opt ...config.Option)
}
// NewClient instantiates Dapr client using DAPR_GRPC_PORT environment variable as port.
// Note, this default factory function creates Dapr client only once. All subsequent invocations
// will return the already created instance. To create multiple instances of the Dapr client,
// use one of the parameterized factory functions:
// NewClientWithPort(port string) (client Client, err error)
// NewClientWithAddress(address string) (client Client, err error)
// NewClientWithConnection(conn *grpc.ClientConn) Client
func NewClient() (client Client, err error) {
port := os.Getenv(daprPortEnvVarName)
if port == "" {
port = daprPortDefault
}
var onceErr error
doOnce.Do(func() {
c, err := NewClientWithPort(port)
onceErr = errors.Wrap(err, "error creating default client")
defaultClient = c
})
return defaultClient, onceErr
}
// NewClientWithPort instantiates Dapr using specific port.
func NewClientWithPort(port string) (client Client, err error) {
if port == "" {
return nil, errors.New("nil port")
}
return NewClientWithAddress(net.JoinHostPort("127.0.0.1", port))
}
// NewClientWithAddress instantiates Dapr using specific address (including port).
func NewClientWithAddress(address string) (client Client, err error) {
if address == "" {
return nil, errors.New("nil address")
}
logger.Printf("dapr client initializing for: %s", address)
ctx, ctxCancel := context.WithTimeout(context.Background(), 1*time.Second)
conn, err := grpc.DialContext(
ctx,
address,
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
ctxCancel()
return nil, errors.Wrapf(err, "error creating connection to '%s': %v", address, err)
}
if hasToken := os.Getenv(apiTokenEnvVarName); hasToken != "" {
logger.Println("client uses API token")
}
return newClientWithConnectionAndCancelFunc(conn, ctxCancel), nil
}
// NewClientWithConnection instantiates Dapr client using specific connection.
func NewClientWithConnection(conn *grpc.ClientConn) Client {
return newClientWithConnectionAndCancelFunc(conn, func() {})
}
func newClientWithConnectionAndCancelFunc(
conn *grpc.ClientConn,
cancelFunc context.CancelFunc,
) Client {
return &GRPCClient{
connection: conn,
ctxCancelFunc: cancelFunc,
protoClient: pb.NewDaprClient(conn),
authToken: os.Getenv(apiTokenEnvVarName),
}
}
// GRPCClient is the gRPC implementation of Dapr client.
type GRPCClient struct {
connection *grpc.ClientConn
ctxCancelFunc context.CancelFunc
protoClient pb.DaprClient
authToken string
mux sync.Mutex
}
// Close cleans up all resources created by the client.
func (c *GRPCClient) Close() {
c.ctxCancelFunc()
if c.connection != nil {
c.connection.Close()
}
}
// WithAuthToken sets Dapr API token on the instantiated client.
// Allows empty string to reset token on existing client.
func (c *GRPCClient) WithAuthToken(token string) {
c.mux.Lock()
c.authToken = token
c.mux.Unlock()
}
// WithTraceID adds existing trace ID to the outgoing context.
func (c *GRPCClient) WithTraceID(ctx context.Context, id string) context.Context {
if id == "" {
return ctx
}
logger.Printf("using trace parent ID: %s", id)
md := metadata.Pairs(traceparentKey, id)
return metadata.NewOutgoingContext(ctx, md)
}
func (c *GRPCClient) withAuthToken(ctx context.Context) context.Context {
if c.authToken == "" {
return ctx
}
return metadata.NewOutgoingContext(ctx, metadata.Pairs(apiTokenKey, c.authToken))
}
// Shutdown the sidecar.
func (c *GRPCClient) Shutdown(ctx context.Context) error {
_, err := c.protoClient.Shutdown(c.withAuthToken(ctx), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "error shutting down the sidecar")
}
return nil
}