forked from dapr/go-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
83 lines (69 loc) · 2.57 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package client
import (
"context"
"encoding/json"
"log"
"github.com/pkg/errors"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)
// PublishEventOption is the type for the functional option.
type PublishEventOption func(*pb.PublishEventRequest)
// PublishEvent publishes data onto specific pubsub topic.
func (c *GRPCClient) PublishEvent(ctx context.Context, pubsubName, topicName string, data interface{}, opts ...PublishEventOption) error {
if pubsubName == "" {
return errors.New("pubsubName name required")
}
if topicName == "" {
return errors.New("topic name required")
}
request := &pb.PublishEventRequest{
PubsubName: pubsubName,
Topic: topicName,
}
for _, opt := range opts {
opt(request)
}
if data != nil {
switch d := data.(type) {
case []byte:
request.Data = d
case string:
request.Data = []byte(d)
default:
var err error
request.DataContentType = "application/json"
request.Data, err = json.Marshal(d)
if err != nil {
return errors.WithMessage(err, "error serializing input struct")
}
}
}
_, err := c.protoClient.PublishEvent(c.withAuthToken(ctx), request)
if err != nil {
return errors.Wrapf(err, "error publishing event unto %s topic", topicName)
}
return nil
}
// PublishEventWithContentType can be passed as option to PublishEvent to set an explicit Content-Type.
func PublishEventWithContentType(contentType string) PublishEventOption {
return func(e *pb.PublishEventRequest) {
e.DataContentType = contentType
}
}
// PublishEventWithMetadata can be passed as option to PublishEvent to set metadata.
func PublishEventWithMetadata(metadata map[string]string) PublishEventOption {
return func(e *pb.PublishEventRequest) {
e.Metadata = metadata
}
}
// PublishEventfromCustomContent serializes an struct and publishes its contents as data (JSON) onto topic in specific pubsub component.
// Deprecated: This method is deprecated and will be removed in a future version of the SDK. Please use `PublishEvent` instead.
func (c *GRPCClient) PublishEventfromCustomContent(ctx context.Context, pubsubName, topicName string, data interface{}) error {
log.Println("DEPRECATED: client.PublishEventfromCustomContent is deprecated and will be removed in a future version of the SDK. Please use `PublishEvent` instead.")
// Perform the JSON marshaling here just in case someone passed a []byte or string as data
enc, err := json.Marshal(data)
if err != nil {
return errors.WithMessage(err, "error serializing input struct")
}
return c.PublishEvent(ctx, pubsubName, topicName, enc, PublishEventWithContentType("application/json"))
}