Skip to content

Commit

Permalink
Pub/Sub routing support (dapr#227)
Browse files Browse the repository at this point in the history
Signed-off-by: Phil Kedy <[email protected]>
  • Loading branch information
pkedy authored Jan 6, 2022
1 parent 078d0cd commit 40d8a4e
Show file tree
Hide file tree
Showing 21 changed files with 558 additions and 99 deletions.
16 changes: 16 additions & 0 deletions daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
}
```

Optionally, you can use [routing rules](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/) to send messages to different handlers based on the contents of the CloudEvent.

```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
Route: "/important",
Match: `event.type == "important"`,
Priority: 1,
}
err := s.AddTopicEventHandler(sub, importantHandler)
if err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
```

### Service Invocation Handler
To handle service invocations you will need to add at least one service invocation handler before starting the service:

Expand Down
20 changes: 18 additions & 2 deletions daprdocs/content/en/go-sdk-docs/go-service/http-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ To handle events from specific topic you need to add at least one topic event ha
```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
Route: "/events",
Topic: "topic1",
Route: "/events",
}
err := s.AddTopicEventHandler(sub, eventHandler)
if err != nil {
Expand All @@ -62,6 +62,22 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
}
```

Optionally, you can use [routing rules](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/) to send messages to different handlers based on the contents of the CloudEvent.

```go
sub := &common.Subscription{
PubsubName: "messages",
Topic: "topic1",
Route: "/important",
Match: `event.type == "important"`,
Priority: 1,
}
err := s.AddTopicEventHandler(sub, importantHandler)
if err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
```

### Service Invocation Handler
To handle service invocations you will need to add at least one service invocation handler before starting the service:

Expand Down
3 changes: 3 additions & 0 deletions examples/pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ require (
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

// Needed to validate SDK changes in CI/CD
replace github.com/dapr/go-sdk => ../../
2 changes: 0 additions & 2 deletions examples/pubsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
github.com/dapr/components-contrib v1.5.1-rc.1/go.mod h1:k40RvOMnDmJMSSbWZ10ajjWJ9pEuq4Z5eKxCa/yrAe8=
github.com/dapr/dapr v1.5.1 h1:AMSf8Z0bs2MsNDJYSJv03kinV/TBEm4M2DejfVTAfPw=
github.com/dapr/dapr v1.5.1/go.mod h1:2YhuJCkJ/j3WKSii7M/Ma7QlX40T6I1nsgZu2/UKEAM=
github.com/dapr/go-sdk v1.3.1-0.20211214200612-a38be4e38b7d h1:mAc8+pXI+soaVt/qJJf33wnsa/+FzkOOsb6UT8pHGCc=
github.com/dapr/go-sdk v1.3.1-0.20211214200612-a38be4e38b7d/go.mod h1:TUTITZTcalzH6uICpQYTMvwC9Hm/2XrGFjZWopYrGlo=
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233/go.mod h1:y8r0VqUNKyd6xBXp7gQjwA59wlCLGfKzL5J8iJsN09w=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
24 changes: 22 additions & 2 deletions examples/pubsub/sub/sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,31 @@ import (
// - PubsubName: is the name of the component configured in the metadata of pubsub.yaml.
// - Topic: is the name of the topic to subscribe.
// - Route: tell dapr where to request the API to publish the message to the subscriber when get a message from topic.
var sub = &common.Subscription{
// - Match: (Optional) The CEL expression to match on the CloudEvent to select this route.
// - Priority: (Optional) The priority order of the route when Match is specificed.
// If not specified, the matches are evaluated in the order in which they are added.
var defaultSubscription = &common.Subscription{
PubsubName: "messages",
Topic: "neworder",
Route: "/orders",
}

var importantSubscription = &common.Subscription{
PubsubName: "messages",
Topic: "neworder",
Route: "/important",
Match: `event.type == "important"`,
Priority: 1,
}

func main() {
s := daprd.NewService(":8080")

if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}

Expand All @@ -48,3 +63,8 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er
log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}

func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}
12 changes: 9 additions & 3 deletions service/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@ const (
// Service represents Dapr callback service.
type Service interface {
// AddServiceInvocationHandler appends provided service invocation handler with its name to the service.
AddServiceInvocationHandler(name string, fn func(ctx context.Context, in *InvocationEvent) (out *Content, err error)) error
AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn func(ctx context.Context, e *TopicEvent) (retry bool, err error)) error
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error
AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error
// RegisterActorImplFactory Register a new actor to actor runtime of go sdk
RegisterActorImplFactory(f actor.Factory, opts ...config.Option)
// Start starts service.
Start() error
// Stop stops the previously started service.
Stop() error
}

type (
ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
TopicEventHandler func(ctx context.Context, e *TopicEvent) (retry bool, err error)
BindingInvocationHandler func(ctx context.Context, in *BindingEvent) (out []byte, err error)
)
8 changes: 6 additions & 2 deletions service/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ type Subscription struct {
PubsubName string `json:"pubsubname"`
// Topic is the name of the topic
Topic string `json:"topic"`
// Route is the route of the handler where HTTP topic events should be published (not used in gRPC)
Route string `json:"route"`
// Metadata is the subscription metadata
Metadata map[string]string `json:"metadata,omitempty"`
// Route is the route of the handler where HTTP topic events should be published (passed as Path in gRPC)
Route string `json:"route"`
// Match is the CEL expression to match on the CloudEvent envelope.
Match string `json:"match"`
// Priority is the priority in which to evaluate the match (lower to higher).
Priority int `json:"priority"`
}

const (
Expand Down
2 changes: 1 addition & 1 deletion service/grpc/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

// AddBindingInvocationHandler appends provided binding invocation handler with its name to the service.
func (s *Server) AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)) error {
func (s *Server) AddBindingInvocationHandler(name string, fn common.BindingInvocationHandler) error {
if name == "" {
return fmt.Errorf("binding name required")
}
Expand Down
2 changes: 1 addition & 1 deletion service/grpc/invoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// AddServiceInvocationHandler appends provided service invocation handler with its method to the service.
func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
func (s *Server) AddServiceInvocationHandler(method string, fn cc.ServiceInvocationHandler) error {
if method == "" {
return fmt.Errorf("servie name required")
}
Expand Down
37 changes: 15 additions & 22 deletions service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ limitations under the License.
package grpc

import (
"context"
"net"
"os"

"github.com/pkg/errors"
"google.golang.org/grpc"

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"

"github.com/dapr/go-sdk/actor"
"github.com/dapr/go-sdk/actor/config"
"github.com/dapr/go-sdk/service/common"

"github.com/pkg/errors"

"google.golang.org/grpc"
"github.com/dapr/go-sdk/service/internal"
)

// NewService creates new Service.
Expand All @@ -49,35 +49,28 @@ func NewServiceWithListener(lis net.Listener) common.Service {

func newService(lis net.Listener) *Server {
return &Server{
listener: lis,
invokeHandlers: make(map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)),
topicSubscriptions: make(map[string]*topicEventHandler),
bindingHandlers: make(map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)),
authToken: os.Getenv(common.AppAPITokenEnvVar),
listener: lis,
invokeHandlers: make(map[string]common.ServiceInvocationHandler),
topicRegistrar: make(internal.TopicRegistrar),
bindingHandlers: make(map[string]common.BindingInvocationHandler),
authToken: os.Getenv(common.AppAPITokenEnvVar),
}
}

// Server is the gRPC service implementation for Dapr.
type Server struct {
pb.UnimplementedAppCallbackServer
listener net.Listener
invokeHandlers map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)
topicSubscriptions map[string]*topicEventHandler
bindingHandlers map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)
authToken string
listener net.Listener
invokeHandlers map[string]common.ServiceInvocationHandler
topicRegistrar internal.TopicRegistrar
bindingHandlers map[string]common.BindingInvocationHandler
authToken string
}

func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) {
panic("Actor is not supported by gRPC API")
}

type topicEventHandler struct {
component string
topic string
fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error)
meta map[string]string
}

// Start registers the server and starts it.
func (s *Server) Start() error {
gs := grpc.NewServer()
Expand Down
66 changes: 43 additions & 23 deletions service/grpc/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,31 @@ import (

pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/go-sdk/service/internal"
)

// AddTopicEventHandler appends provided event handler with topic name to the service.
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error)) error {
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if sub == nil {
return errors.New("subscription required")
}
if sub.Topic == "" {
return errors.New("topic name required")
}
if sub.PubsubName == "" {
return errors.New("pub/sub name required")
}
if fn == nil {
return fmt.Errorf("topic handler required")
}
key := fmt.Sprintf("%s-%s", sub.PubsubName, sub.Topic)
s.topicSubscriptions[key] = &topicEventHandler{
component: sub.PubsubName,
topic: sub.Topic,
fn: fn,
meta: sub.Metadata,
if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
return err
}

return nil
}

// ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to.
func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
subs := make([]*pb.TopicSubscription, 0)
for _, v := range s.topicSubscriptions {
for _, v := range s.topicRegistrar {
s := v.Subscription
sub := &pb.TopicSubscription{
PubsubName: v.component,
Topic: v.topic,
Metadata: v.meta,
PubsubName: s.PubsubName,
Topic: s.Topic,
Metadata: s.Metadata,
Routes: convertRoutes(s.Routes),
}
subs = append(subs, sub)
}
Expand All @@ -68,6 +59,23 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*
}, nil
}

func convertRoutes(routes *internal.TopicRoutes) *pb.TopicRoutes {
if routes == nil {
return nil
}
rules := make([]*pb.TopicRule, len(routes.Rules))
for i, rule := range routes.Rules {
rules[i] = &pb.TopicRule{
Match: rule.Match,
Path: rule.Path,
}
}
return &pb.TopicRoutes{
Rules: rules,
Default: routes.Default,
}
}

// OnTopicEvent fired whenever a message has been published to a topic that has been subscribed.
// Dapr sends published messages in a CloudEvents v1.0 envelope.
func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) {
Expand All @@ -76,8 +84,8 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p
// since Dapr will not get updated until long after this event expires, just drop it
return &pb.TopicEventResponse{Status: pb.TopicEventResponse_DROP}, errors.New("pub/sub and topic names required")
}
key := fmt.Sprintf("%s-%s", in.PubsubName, in.Topic)
if h, ok := s.topicSubscriptions[key]; ok {
key := in.PubsubName + "-" + in.Topic
if sub, ok := s.topicRegistrar[key]; ok {
data := interface{}(in.Data)
if len(in.Data) > 0 {
mediaType, _, err := mime.ParseMediaType(in.DataContentType)
Expand Down Expand Up @@ -113,7 +121,19 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p
Topic: in.Topic,
PubsubName: in.PubsubName,
}
retry, err := h.fn(ctx, e)
h := sub.DefaultHandler
if in.Path != "" {
if pathHandler, ok := sub.RouteHandlers[in.Path]; ok {
h = pathHandler
}
}
if h == nil {
return &pb.TopicEventResponse{Status: pb.TopicEventResponse_RETRY}, fmt.Errorf(
"route %s for pub/sub and topic combination not configured: %s/%s",
in.Path, in.PubsubName, in.Topic,
)
}
retry, err := h(ctx, e)
if err == nil {
return &pb.TopicEventResponse{Status: pb.TopicEventResponse_SUCCESS}, nil
}
Expand Down
Loading

0 comments on commit 40d8a4e

Please sign in to comment.