From 40d8a4eabe2bc6244a0de6789f74d2fe6e302db9 Mon Sep 17 00:00:00 2001 From: Phil Kedy Date: Thu, 6 Jan 2022 13:19:24 -0500 Subject: [PATCH] Pub/Sub routing support (#227) Signed-off-by: Phil Kedy --- .../en/go-sdk-docs/go-service/grpc-service.md | 16 +++ .../en/go-sdk-docs/go-service/http-service.md | 20 +++- examples/pubsub/go.mod | 3 + examples/pubsub/go.sum | 2 - examples/pubsub/sub/sub.go | 24 +++- service/common/service.go | 12 +- service/common/type.go | 8 +- service/grpc/binding.go | 2 +- service/grpc/invoke.go | 2 +- service/grpc/service.go | 37 +++--- service/grpc/topic.go | 66 +++++++---- service/grpc/topic_test.go | 42 ++++++- service/http/binding.go | 3 +- service/http/invoke.go | 3 +- service/http/service.go | 18 +-- service/http/topic.go | 37 ++---- service/http/topic_test.go | 49 ++++++++ service/internal/topicregistrar.go | 56 +++++++++ service/internal/topicregistrar_test.go | 73 ++++++++++++ service/internal/topicsubscription.go | 112 ++++++++++++++++++ service/internal/topicsubscription_test.go | 72 +++++++++++ 21 files changed, 558 insertions(+), 99 deletions(-) create mode 100644 service/internal/topicregistrar.go create mode 100644 service/internal/topicregistrar_test.go create mode 100644 service/internal/topicsubscription.go create mode 100644 service/internal/topicsubscription_test.go diff --git a/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md b/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md index 155f720f..4ac37c39 100644 --- a/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md +++ b/daprdocs/content/en/go-sdk-docs/go-service/grpc-service.md @@ -67,6 +67,22 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er } ``` +Optionally, you can use [routing rules](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/) to send messages to different handlers based on the contents of the CloudEvent. + +```go +sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", + Route: "/important", + Match: `event.type == "important"`, + Priority: 1, +} +err := s.AddTopicEventHandler(sub, importantHandler) +if err != nil { + log.Fatalf("error adding topic subscription: %v", err) +} +``` + ### Service Invocation Handler To handle service invocations you will need to add at least one service invocation handler before starting the service: diff --git a/daprdocs/content/en/go-sdk-docs/go-service/http-service.md b/daprdocs/content/en/go-sdk-docs/go-service/http-service.md index e812ce84..643f24f2 100644 --- a/daprdocs/content/en/go-sdk-docs/go-service/http-service.md +++ b/daprdocs/content/en/go-sdk-docs/go-service/http-service.md @@ -43,8 +43,8 @@ To handle events from specific topic you need to add at least one topic event ha ```go sub := &common.Subscription{ PubsubName: "messages", - Topic: "topic1", - Route: "/events", + Topic: "topic1", + Route: "/events", } err := s.AddTopicEventHandler(sub, eventHandler) if err != nil { @@ -62,6 +62,22 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er } ``` +Optionally, you can use [routing rules](https://docs.dapr.io/developing-applications/building-blocks/pubsub/howto-route-messages/) to send messages to different handlers based on the contents of the CloudEvent. + +```go +sub := &common.Subscription{ + PubsubName: "messages", + Topic: "topic1", + Route: "/important", + Match: `event.type == "important"`, + Priority: 1, +} +err := s.AddTopicEventHandler(sub, importantHandler) +if err != nil { + log.Fatalf("error adding topic subscription: %v", err) +} +``` + ### Service Invocation Handler To handle service invocations you will need to add at least one service invocation handler before starting the service: diff --git a/examples/pubsub/go.mod b/examples/pubsub/go.mod index 3ac6f791..75337ac8 100644 --- a/examples/pubsub/go.mod +++ b/examples/pubsub/go.mod @@ -17,3 +17,6 @@ require ( google.golang.org/protobuf v1.26.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) + +// Needed to validate SDK changes in CI/CD +replace github.com/dapr/go-sdk => ../../ diff --git a/examples/pubsub/go.sum b/examples/pubsub/go.sum index d6155e25..26a4e272 100644 --- a/examples/pubsub/go.sum +++ b/examples/pubsub/go.sum @@ -232,8 +232,6 @@ github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E github.com/dapr/components-contrib v1.5.1-rc.1/go.mod h1:k40RvOMnDmJMSSbWZ10ajjWJ9pEuq4Z5eKxCa/yrAe8= github.com/dapr/dapr v1.5.1 h1:AMSf8Z0bs2MsNDJYSJv03kinV/TBEm4M2DejfVTAfPw= github.com/dapr/dapr v1.5.1/go.mod h1:2YhuJCkJ/j3WKSii7M/Ma7QlX40T6I1nsgZu2/UKEAM= -github.com/dapr/go-sdk v1.3.1-0.20211214200612-a38be4e38b7d h1:mAc8+pXI+soaVt/qJJf33wnsa/+FzkOOsb6UT8pHGCc= -github.com/dapr/go-sdk v1.3.1-0.20211214200612-a38be4e38b7d/go.mod h1:TUTITZTcalzH6uICpQYTMvwC9Hm/2XrGFjZWopYrGlo= github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233/go.mod h1:y8r0VqUNKyd6xBXp7gQjwA59wlCLGfKzL5J8iJsN09w= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/examples/pubsub/sub/sub.go b/examples/pubsub/sub/sub.go index a1b4c641..9fcb1d25 100644 --- a/examples/pubsub/sub/sub.go +++ b/examples/pubsub/sub/sub.go @@ -26,16 +26,31 @@ import ( // - PubsubName: is the name of the component configured in the metadata of pubsub.yaml. // - Topic: is the name of the topic to subscribe. // - Route: tell dapr where to request the API to publish the message to the subscriber when get a message from topic. -var sub = &common.Subscription{ +// - Match: (Optional) The CEL expression to match on the CloudEvent to select this route. +// - Priority: (Optional) The priority order of the route when Match is specificed. +// If not specified, the matches are evaluated in the order in which they are added. +var defaultSubscription = &common.Subscription{ PubsubName: "messages", Topic: "neworder", Route: "/orders", } +var importantSubscription = &common.Subscription{ + PubsubName: "messages", + Topic: "neworder", + Route: "/important", + Match: `event.type == "important"`, + Priority: 1, +} + func main() { s := daprd.NewService(":8080") - if err := s.AddTopicEventHandler(sub, eventHandler); err != nil { + if err := s.AddTopicEventHandler(defaultSubscription, eventHandler); err != nil { + log.Fatalf("error adding topic subscription: %v", err) + } + + if err := s.AddTopicEventHandler(importantSubscription, importantEventHandler); err != nil { log.Fatalf("error adding topic subscription: %v", err) } @@ -48,3 +63,8 @@ func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err er log.Printf("event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data) return false, nil } + +func importantEventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + log.Printf("important event - PubsubName: %s, Topic: %s, ID: %s, Data: %s", e.PubsubName, e.Topic, e.ID, e.Data) + return false, nil +} diff --git a/service/common/service.go b/service/common/service.go index 1b4eda9f..cb7f90fa 100644 --- a/service/common/service.go +++ b/service/common/service.go @@ -29,12 +29,12 @@ const ( // Service represents Dapr callback service. type Service interface { // AddServiceInvocationHandler appends provided service invocation handler with its name to the service. - AddServiceInvocationHandler(name string, fn func(ctx context.Context, in *InvocationEvent) (out *Content, err error)) error + AddServiceInvocationHandler(name string, fn ServiceInvocationHandler) error // AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service. // Note, retries are only considered when there is an error. Lack of error is considered as a success - AddTopicEventHandler(sub *Subscription, fn func(ctx context.Context, e *TopicEvent) (retry bool, err error)) error + AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. - AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *BindingEvent) (out []byte, err error)) error + AddBindingInvocationHandler(name string, fn BindingInvocationHandler) error // RegisterActorImplFactory Register a new actor to actor runtime of go sdk RegisterActorImplFactory(f actor.Factory, opts ...config.Option) // Start starts service. @@ -42,3 +42,9 @@ type Service interface { // Stop stops the previously started service. Stop() error } + +type ( + ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error) + TopicEventHandler func(ctx context.Context, e *TopicEvent) (retry bool, err error) + BindingInvocationHandler func(ctx context.Context, in *BindingEvent) (out []byte, err error) +) diff --git a/service/common/type.go b/service/common/type.go index 05b8f74f..d1f5e7db 100644 --- a/service/common/type.go +++ b/service/common/type.go @@ -91,10 +91,14 @@ type Subscription struct { PubsubName string `json:"pubsubname"` // Topic is the name of the topic Topic string `json:"topic"` - // Route is the route of the handler where HTTP topic events should be published (not used in gRPC) - Route string `json:"route"` // Metadata is the subscription metadata Metadata map[string]string `json:"metadata,omitempty"` + // Route is the route of the handler where HTTP topic events should be published (passed as Path in gRPC) + Route string `json:"route"` + // Match is the CEL expression to match on the CloudEvent envelope. + Match string `json:"match"` + // Priority is the priority in which to evaluate the match (lower to higher). + Priority int `json:"priority"` } const ( diff --git a/service/grpc/binding.go b/service/grpc/binding.go index 2ace2a31..60702e7b 100644 --- a/service/grpc/binding.go +++ b/service/grpc/binding.go @@ -25,7 +25,7 @@ import ( ) // AddBindingInvocationHandler appends provided binding invocation handler with its name to the service. -func (s *Server) AddBindingInvocationHandler(name string, fn func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)) error { +func (s *Server) AddBindingInvocationHandler(name string, fn common.BindingInvocationHandler) error { if name == "" { return fmt.Errorf("binding name required") } diff --git a/service/grpc/invoke.go b/service/grpc/invoke.go index ef08e6d3..4cb4256c 100644 --- a/service/grpc/invoke.go +++ b/service/grpc/invoke.go @@ -26,7 +26,7 @@ import ( ) // AddServiceInvocationHandler appends provided service invocation handler with its method to the service. -func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error { +func (s *Server) AddServiceInvocationHandler(method string, fn cc.ServiceInvocationHandler) error { if method == "" { return fmt.Errorf("servie name required") } diff --git a/service/grpc/service.go b/service/grpc/service.go index 008ca24a..2fec6652 100644 --- a/service/grpc/service.go +++ b/service/grpc/service.go @@ -14,18 +14,18 @@ limitations under the License. package grpc import ( - "context" "net" "os" + "github.com/pkg/errors" + "google.golang.org/grpc" + pb "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/go-sdk/actor" "github.com/dapr/go-sdk/actor/config" "github.com/dapr/go-sdk/service/common" - - "github.com/pkg/errors" - - "google.golang.org/grpc" + "github.com/dapr/go-sdk/service/internal" ) // NewService creates new Service. @@ -49,35 +49,28 @@ func NewServiceWithListener(lis net.Listener) common.Service { func newService(lis net.Listener) *Server { return &Server{ - listener: lis, - invokeHandlers: make(map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)), - topicSubscriptions: make(map[string]*topicEventHandler), - bindingHandlers: make(map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)), - authToken: os.Getenv(common.AppAPITokenEnvVar), + listener: lis, + invokeHandlers: make(map[string]common.ServiceInvocationHandler), + topicRegistrar: make(internal.TopicRegistrar), + bindingHandlers: make(map[string]common.BindingInvocationHandler), + authToken: os.Getenv(common.AppAPITokenEnvVar), } } // Server is the gRPC service implementation for Dapr. type Server struct { pb.UnimplementedAppCallbackServer - listener net.Listener - invokeHandlers map[string]func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error) - topicSubscriptions map[string]*topicEventHandler - bindingHandlers map[string]func(ctx context.Context, in *common.BindingEvent) (out []byte, err error) - authToken string + listener net.Listener + invokeHandlers map[string]common.ServiceInvocationHandler + topicRegistrar internal.TopicRegistrar + bindingHandlers map[string]common.BindingInvocationHandler + authToken string } func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) { panic("Actor is not supported by gRPC API") } -type topicEventHandler struct { - component string - topic string - fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) - meta map[string]string -} - // Start registers the server and starts it. func (s *Server) Start() error { gs := grpc.NewServer() diff --git a/service/grpc/topic.go b/service/grpc/topic.go index d5afe872..7b565ef1 100644 --- a/service/grpc/topic.go +++ b/service/grpc/topic.go @@ -25,40 +25,31 @@ import ( pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/internal" ) // AddTopicEventHandler appends provided event handler with topic name to the service. -func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error)) error { +func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error { if sub == nil { return errors.New("subscription required") } - if sub.Topic == "" { - return errors.New("topic name required") - } - if sub.PubsubName == "" { - return errors.New("pub/sub name required") - } - if fn == nil { - return fmt.Errorf("topic handler required") - } - key := fmt.Sprintf("%s-%s", sub.PubsubName, sub.Topic) - s.topicSubscriptions[key] = &topicEventHandler{ - component: sub.PubsubName, - topic: sub.Topic, - fn: fn, - meta: sub.Metadata, + if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil { + return err } + return nil } // ListTopicSubscriptions is called by Dapr to get the list of topics in a pubsub component the app wants to subscribe to. func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) { subs := make([]*pb.TopicSubscription, 0) - for _, v := range s.topicSubscriptions { + for _, v := range s.topicRegistrar { + s := v.Subscription sub := &pb.TopicSubscription{ - PubsubName: v.component, - Topic: v.topic, - Metadata: v.meta, + PubsubName: s.PubsubName, + Topic: s.Topic, + Metadata: s.Metadata, + Routes: convertRoutes(s.Routes), } subs = append(subs, sub) } @@ -68,6 +59,23 @@ func (s *Server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (* }, nil } +func convertRoutes(routes *internal.TopicRoutes) *pb.TopicRoutes { + if routes == nil { + return nil + } + rules := make([]*pb.TopicRule, len(routes.Rules)) + for i, rule := range routes.Rules { + rules[i] = &pb.TopicRule{ + Match: rule.Match, + Path: rule.Path, + } + } + return &pb.TopicRoutes{ + Rules: rules, + Default: routes.Default, + } +} + // OnTopicEvent fired whenever a message has been published to a topic that has been subscribed. // Dapr sends published messages in a CloudEvents v1.0 envelope. func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) { @@ -76,8 +84,8 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p // since Dapr will not get updated until long after this event expires, just drop it return &pb.TopicEventResponse{Status: pb.TopicEventResponse_DROP}, errors.New("pub/sub and topic names required") } - key := fmt.Sprintf("%s-%s", in.PubsubName, in.Topic) - if h, ok := s.topicSubscriptions[key]; ok { + key := in.PubsubName + "-" + in.Topic + if sub, ok := s.topicRegistrar[key]; ok { data := interface{}(in.Data) if len(in.Data) > 0 { mediaType, _, err := mime.ParseMediaType(in.DataContentType) @@ -113,7 +121,19 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p Topic: in.Topic, PubsubName: in.PubsubName, } - retry, err := h.fn(ctx, e) + h := sub.DefaultHandler + if in.Path != "" { + if pathHandler, ok := sub.RouteHandlers[in.Path]; ok { + h = pathHandler + } + } + if h == nil { + return &pb.TopicEventResponse{Status: pb.TopicEventResponse_RETRY}, fmt.Errorf( + "route %s for pub/sub and topic combination not configured: %s/%s", + in.Path, in.PubsubName, in.Topic, + ) + } + retry, err := h(ctx, e) if err == nil { return &pb.TopicEventResponse{Status: pb.TopicEventResponse_SUCCESS}, nil } diff --git a/service/grpc/topic_test.go b/service/grpc/topic_test.go index e4bf1b5f..ffddb895 100644 --- a/service/grpc/topic_test.go +++ b/service/grpc/topic_test.go @@ -44,17 +44,51 @@ func TestTopicErrors(t *testing.T) { } func TestTopicSubscriptionList(t *testing.T) { - sub := &common.Subscription{ + server := getTestServer() + + // Add default route. + sub1 := &common.Subscription{ PubsubName: "messages", Topic: "test", + Route: "/test", } - server := getTestServer() - err := server.AddTopicEventHandler(sub, eventHandler) + err := server.AddTopicEventHandler(sub1, eventHandler) assert.Nil(t, err) resp, err := server.ListTopicSubscriptions(context.Background(), &empty.Empty{}) assert.NoError(t, err) assert.NotNil(t, resp) - assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers") + if assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers") { + sub := resp.Subscriptions[0] + assert.Equal(t, "messages", sub.PubsubName) + assert.Equal(t, "test", sub.Topic) + assert.Nil(t, sub.Routes) + } + + // Add routing rule. + sub2 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/other", + Match: `event.type == "other"`, + } + err = server.AddTopicEventHandler(sub2, eventHandler) + assert.Nil(t, err) + resp, err = server.ListTopicSubscriptions(context.Background(), &empty.Empty{}) + assert.NoError(t, err) + assert.NotNil(t, resp) + if assert.Lenf(t, resp.Subscriptions, 1, "expected 1 handlers") { + sub := resp.Subscriptions[0] + assert.Equal(t, "messages", sub.PubsubName) + assert.Equal(t, "test", sub.Topic) + if assert.NotNil(t, sub.Routes) { + assert.Equal(t, "/test", sub.Routes.Default) + if assert.Len(t, sub.Routes.Rules, 1) { + rule := sub.Routes.Rules[0] + assert.Equal(t, "/other", rule.Path) + assert.Equal(t, `event.type == "other"`, rule.Match) + } + } + } } // go test -timeout 30s ./service/grpc -count 1 -run ^TestTopic$ diff --git a/service/http/binding.go b/service/http/binding.go index ceae3f9f..fc8175ac 100644 --- a/service/http/binding.go +++ b/service/http/binding.go @@ -14,7 +14,6 @@ limitations under the License. package http import ( - "context" "fmt" "io/ioutil" "net/http" @@ -24,7 +23,7 @@ import ( ) // AddBindingInvocationHandler appends provided binding invocation handler with its route to the service. -func (s *Server) AddBindingInvocationHandler(route string, fn func(ctx context.Context, in *common.BindingEvent) (out []byte, err error)) error { +func (s *Server) AddBindingInvocationHandler(route string, fn common.BindingInvocationHandler) error { if route == "" { return fmt.Errorf("binding route required") } diff --git a/service/http/invoke.go b/service/http/invoke.go index d9af42b3..2c51103a 100644 --- a/service/http/invoke.go +++ b/service/http/invoke.go @@ -14,7 +14,6 @@ limitations under the License. package http import ( - "context" "fmt" "io/ioutil" "net/http" @@ -24,7 +23,7 @@ import ( ) // AddServiceInvocationHandler appends provided service invocation handler with its route to the service. -func (s *Server) AddServiceInvocationHandler(route string, fn func(ctx context.Context, in *common.InvocationEvent) (out *common.Content, err error)) error { +func (s *Server) AddServiceInvocationHandler(route string, fn common.ServiceInvocationHandler) error { if route == "" { return fmt.Errorf("service route required") } diff --git a/service/http/service.go b/service/http/service.go index a6c2f684..7823e8fe 100644 --- a/service/http/service.go +++ b/service/http/service.go @@ -24,8 +24,8 @@ import ( "github.com/dapr/go-sdk/actor" "github.com/dapr/go-sdk/actor/config" "github.com/dapr/go-sdk/actor/runtime" - "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/internal" ) // NewService creates new Service. @@ -48,19 +48,19 @@ func newServer(address string, router *mux.Router) *Server { Addr: address, Handler: router, }, - mux: router, - topicSubscriptions: make([]*common.Subscription, 0), - authToken: os.Getenv(common.AppAPITokenEnvVar), + mux: router, + topicRegistrar: make(internal.TopicRegistrar), + authToken: os.Getenv(common.AppAPITokenEnvVar), } } // Server is the HTTP server wrapping mux many Dapr helpers. type Server struct { - address string - mux *mux.Router - httpServer *http.Server - topicSubscriptions []*common.Subscription - authToken string + address string + mux *mux.Router + httpServer *http.Server + topicRegistrar internal.TopicRegistrar + authToken string } func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) { diff --git a/service/http/topic.go b/service/http/topic.go index 0d7cd9d1..27c4bb29 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -14,22 +14,18 @@ limitations under the License. package http import ( - "context" "encoding/base64" "encoding/json" - "fmt" + "errors" "io/ioutil" "net/http" - "strings" "github.com/gorilla/mux" actorErr "github.com/dapr/go-sdk/actor/error" "github.com/dapr/go-sdk/actor/runtime" - - "github.com/pkg/errors" - "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/internal" ) const ( @@ -74,8 +70,12 @@ type topicEventJSON struct { func (s *Server) registerBaseHandler() { // register subscribe handler f := func(w http.ResponseWriter, r *http.Request) { + subs := make([]*internal.TopicSubscription, 0, len(s.topicRegistrar)) + for _, s := range s.topicRegistrar { + subs = append(subs, s.Subscription) + } w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(s.topicSubscriptions); err != nil { + if err := json.NewEncoder(w).Encode(subs); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -95,11 +95,10 @@ func (s *Server) registerBaseHandler() { w.WriteHeader(http.StatusInternalServerError) return } + w.WriteHeader(http.StatusOK) if _, err = w.Write(data); err != nil { - w.WriteHeader(http.StatusInternalServerError) return } - w.WriteHeader(http.StatusOK) } s.mux.HandleFunc("/dapr/config", fRegister).Methods(http.MethodGet) @@ -178,29 +177,19 @@ func (s *Server) registerBaseHandler() { } // AddTopicEventHandler appends provided event handler with it's name to the service. -func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn func(ctx context.Context, e *common.TopicEvent) (retry bool, err error)) error { +func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error { if sub == nil { return errors.New("subscription required") } - if sub.Topic == "" { - return errors.New("topic name required") - } - if sub.PubsubName == "" { - return errors.New("pub/sub name required") - } + // Route is only required for HTTP but should be specified for the + // app protocol to be interchangeable. if sub.Route == "" { return errors.New("handler route name") } - if fn == nil { - return fmt.Errorf("topic handler required") + if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil { + return err } - if !strings.HasPrefix(sub.Route, "/") { - sub.Route = fmt.Sprintf("/%s", sub.Route) - } - - s.topicSubscriptions = append(s.topicSubscriptions, sub) - s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc( func(w http.ResponseWriter, r *http.Request) { // check for post with no data diff --git a/service/http/topic_test.go b/service/http/topic_test.go index 66b51b58..282aabc4 100644 --- a/service/http/topic_test.go +++ b/service/http/topic_test.go @@ -18,7 +18,10 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" + "net/http/httptest" + "sort" "strings" "testing" @@ -26,8 +29,10 @@ import ( "github.com/dapr/go-sdk/actor/mock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/internal" ) func testTopicFunc(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { @@ -90,8 +95,52 @@ func TestEventHandler(t *testing.T) { err = s.AddTopicEventHandler(sub2, testErrorTopicFunc) assert.NoErrorf(t, err, "error adding error event handler") + sub3 := &common.Subscription{ + PubsubName: "messages", + Topic: "test", + Route: "/other", + Match: `event.type == "other"`, + Priority: 1, + } + err = s.AddTopicEventHandler(sub3, testTopicFunc) + assert.NoErrorf(t, err, "error adding error event handler") + s.registerBaseHandler() + req, err := http.NewRequest(http.MethodGet, "/dapr/subscribe", nil) + require.NoErrorf(t, err, "error creating request: %s", data) + req.Header.Set("Accept", "application/json") + rr := httptest.NewRecorder() + s.mux.ServeHTTP(rr, req) + resp := rr.Result() + defer resp.Body.Close() + payload, err := io.ReadAll(resp.Body) + require.NoErrorf(t, err, "error reading response") + var subs []internal.TopicSubscription + require.NoErrorf(t, json.Unmarshal(payload, &subs), "could not decode subscribe response") + + sort.Slice(subs, func(i, j int) bool { + less := strings.Compare(subs[i].PubsubName, subs[j].PubsubName) + if less != 0 { + return less < 0 + } + return strings.Compare(subs[i].Topic, subs[j].Topic) <= 0 + }) + + if assert.Lenf(t, subs, 2, "unexpected subscription count") { + assert.Equal(t, "messages", subs[0].PubsubName) + assert.Equal(t, "errors", subs[0].Topic) + + assert.Equal(t, "messages", subs[1].PubsubName) + assert.Equal(t, "test", subs[1].Topic) + assert.Equal(t, "", subs[1].Route) + assert.Equal(t, "/", subs[1].Routes.Default) + if assert.Lenf(t, subs[1].Routes.Rules, 1, "unexpected rules count") { + assert.Equal(t, `event.type == "other"`, subs[1].Routes.Rules[0].Match) + assert.Equal(t, "/other", subs[1].Routes.Rules[0].Path) + } + } + makeEventRequest(t, s, "/", data, http.StatusOK) makeEventRequest(t, s, "/", "", http.StatusSeeOther) makeEventRequest(t, s, "/", "not JSON", http.StatusSeeOther) diff --git a/service/internal/topicregistrar.go b/service/internal/topicregistrar.go new file mode 100644 index 00000000..e7739987 --- /dev/null +++ b/service/internal/topicregistrar.go @@ -0,0 +1,56 @@ +package internal + +import ( + "errors" + "fmt" + + "github.com/dapr/go-sdk/service/common" +) + +// TopicRegistrar is a map of - to `TopicRegistration` +// and acts as a lookup as the application is building up subscriptions with +// potentially multiple routes per topic. +type TopicRegistrar map[string]*TopicRegistration + +// TopicRegistration encapsulates the subscription and handlers. +type TopicRegistration struct { + Subscription *TopicSubscription + DefaultHandler common.TopicEventHandler + RouteHandlers map[string]common.TopicEventHandler +} + +func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.TopicEventHandler) error { + if sub.Topic == "" { + return errors.New("topic name required") + } + if sub.PubsubName == "" { + return errors.New("pub/sub name required") + } + if fn == nil { + return fmt.Errorf("topic handler required") + } + key := sub.PubsubName + "-" + sub.Topic + ts, ok := m[key] + if !ok { + ts = &TopicRegistration{ + Subscription: NewTopicSubscription(sub.PubsubName, sub.Topic), + RouteHandlers: make(map[string]common.TopicEventHandler), + DefaultHandler: nil, + } + m[key] = ts + } + + if sub.Match != "" { + if err := ts.Subscription.AddRoutingRule(sub.Route, sub.Match, sub.Priority); err != nil { + return err + } + } else { + if err := ts.Subscription.SetDefaultRoute(sub.Route); err != nil { + return err + } + ts.DefaultHandler = fn + } + ts.RouteHandlers[sub.Route] = fn + + return nil +} diff --git a/service/internal/topicregistrar_test.go b/service/internal/topicregistrar_test.go new file mode 100644 index 00000000..a4360326 --- /dev/null +++ b/service/internal/topicregistrar_test.go @@ -0,0 +1,73 @@ +package internal_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/internal" +) + +func TestTopicRegistrarValidation(t *testing.T) { + fn := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) { + return false, nil + } + tests := map[string]struct { + sub common.Subscription + fn common.TopicEventHandler + err string + }{ + "pubsub required": { + common.Subscription{ //nolint:exhaustivestruct + PubsubName: "", + Topic: "test", + }, fn, "pub/sub name required", + }, + "topic required": { + common.Subscription{ //nolint:exhaustivestruct + PubsubName: "test", + Topic: "", + }, fn, "topic name required", + }, + "handler required": { + common.Subscription{ //nolint:exhaustivestruct + PubsubName: "test", + Topic: "test", + }, nil, "topic handler required", + }, + "route required for routing rule": { + common.Subscription{ //nolint:exhaustivestruct + PubsubName: "test", + Topic: "test", + Route: "", + Match: `event.type == "test"`, + }, fn, "path is required for routing rules", + }, + "success default route": { + common.Subscription{ //nolint:exhaustivestruct + PubsubName: "test", + Topic: "test", + }, fn, "", + }, + "success routing rule": { + common.Subscription{ //nolint:exhaustivestruct + PubsubName: "test", + Topic: "test", + Route: "/test", + Match: `event.type == "test"`, + }, fn, "", + }, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + m := internal.TopicRegistrar{} + if tt.err != "" { + assert.EqualError(t, m.AddSubscription(&tt.sub, tt.fn), tt.err) + } else { + assert.NoError(t, m.AddSubscription(&tt.sub, tt.fn)) + } + }) + } +} diff --git a/service/internal/topicsubscription.go b/service/internal/topicsubscription.go new file mode 100644 index 00000000..ca628993 --- /dev/null +++ b/service/internal/topicsubscription.go @@ -0,0 +1,112 @@ +package internal + +import ( + "errors" + "fmt" + "sort" +) + +// TopicSubscription internally represents single topic subscription. +type TopicSubscription struct { + // PubsubName is name of the pub/sub this message came from. + PubsubName string `json:"pubsubname"` + // Topic is the name of the topic. + Topic string `json:"topic"` + // Route is the route of the handler where HTTP topic events should be published (passed as Path in gRPC). + Route string `json:"route,omitempty"` + // Routes specify multiple routes where topic events should be sent. + Routes *TopicRoutes `json:"routes,omitempty"` + // Metadata is the subscription metadata. + Metadata map[string]string `json:"metadata,omitempty"` +} + +// TopicRoutes encapsulates the default route and multiple routing rules. +type TopicRoutes struct { + Rules []TopicRule `json:"rules,omitempty"` + Default string `json:"default,omitempty"` + + // priority is used to track duplicate priorities where priority > 0. + // when priority is not specified (0), then the order in which they + // were added is used. + priorities map[int]struct{} +} + +// TopicRule represents a single routing rule. +type TopicRule struct { + // Match is the CEL expression to match on the CloudEvent envelope. + Match string `json:"match"` + // Path is the HTTP path to post the event to (passed as Path in gRPC). + Path string `json:"path"` + // priority is the optional priority order (low to high) for this rule. + priority int `json:"-"` +} + +// NewTopicSubscription creates a new `TopicSubscription`. +func NewTopicSubscription(pubsubName, topic string) *TopicSubscription { + return &TopicSubscription{ //nolint:exhaustivestruct + PubsubName: pubsubName, + Topic: topic, + } +} + +// SetMetadata sets the metadata for the subscription if not already set. +// An error is returned if it is already set. +func (s *TopicSubscription) SetMetadata(metadata map[string]string) error { + if s.Metadata != nil { + return fmt.Errorf("subscription for topic %s on pubsub %s already has metadata set", s.Topic, s.PubsubName) + } + s.Metadata = metadata + + return nil +} + +// SetDefaultRoute sets the default route if not already set. +// An error is returned if it is already set. +func (s *TopicSubscription) SetDefaultRoute(path string) error { + if s.Routes == nil { + if s.Route != "" { + return fmt.Errorf("subscription for topic %s on pubsub %s already has route %s", s.Topic, s.PubsubName, s.Route) + } + s.Route = path + } else { + if s.Routes.Default != "" { + return fmt.Errorf("subscription for topic %s on pubsub %s already has route %s", s.Topic, s.PubsubName, s.Routes.Default) + } + s.Routes.Default = path + } + + return nil +} + +// AddRoutingRule adds a routing rule. +// An error is returned if a there id a duplicate priority > 1. +func (s *TopicSubscription) AddRoutingRule(path, match string, priority int) error { + if path == "" { + return errors.New("path is required for routing rules") + } + if s.Routes == nil { + s.Routes = &TopicRoutes{ //nolint:exhaustivestruct + Default: s.Route, + priorities: map[int]struct{}{}, + } + s.Route = "" + } + if priority > 0 { + if _, exists := s.Routes.priorities[priority]; exists { + return fmt.Errorf("subscription for topic %s on pubsub %s already has a routing rule with priority %d", s.Topic, s.PubsubName, priority) + } + } + s.Routes.Rules = append(s.Routes.Rules, TopicRule{ + Match: match, + Path: path, + priority: priority, + }) + sort.SliceStable(s.Routes.Rules, func(i, j int) bool { + return s.Routes.Rules[i].priority < s.Routes.Rules[j].priority + }) + if priority > 0 { + s.Routes.priorities[priority] = struct{}{} + } + + return nil +} diff --git a/service/internal/topicsubscription_test.go b/service/internal/topicsubscription_test.go new file mode 100644 index 00000000..7e30366c --- /dev/null +++ b/service/internal/topicsubscription_test.go @@ -0,0 +1,72 @@ +package internal_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/go-sdk/service/internal" +) + +func TestTopicSubscripiton(t *testing.T) { + t.Run("duplicate metadata", func(t *testing.T) { + sub := internal.NewTopicSubscription("test", "mytopic") + assert.NoError(t, sub.SetMetadata(map[string]string{ + "test": "test", + })) + assert.EqualError(t, sub.SetMetadata(map[string]string{ + "test": "test", + }), "subscription for topic mytopic on pubsub test already has metadata set") + }) + + t.Run("duplicate route", func(t *testing.T) { + sub := internal.NewTopicSubscription("test", "mytopic") + assert.NoError(t, sub.SetDefaultRoute("/test")) + assert.Equal(t, "/test", sub.Route) + assert.EqualError(t, sub.SetDefaultRoute("/test"), + "subscription for topic mytopic on pubsub test already has route /test") + }) + + t.Run("duplicate route after routing rule", func(t *testing.T) { + sub := internal.NewTopicSubscription("test", "mytopic") + assert.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0)) + assert.NoError(t, sub.SetDefaultRoute("/test")) + assert.EqualError(t, sub.SetDefaultRoute("/test"), + "subscription for topic mytopic on pubsub test already has route /test") + }) + + t.Run("default route after routing rule", func(t *testing.T) { + sub := internal.NewTopicSubscription("test", "mytopic") + assert.NoError(t, sub.SetDefaultRoute("/test")) + assert.Equal(t, "/test", sub.Route) + assert.NoError(t, sub.AddRoutingRule("/other", `event.type == "test"`, 0)) + assert.Equal(t, "", sub.Route) + assert.Equal(t, "/test", sub.Routes.Default) + assert.EqualError(t, sub.SetDefaultRoute("/test"), + "subscription for topic mytopic on pubsub test already has route /test") + }) + + t.Run("duplicate routing rule priority", func(t *testing.T) { + sub := internal.NewTopicSubscription("test", "mytopic") + assert.NoError(t, sub.AddRoutingRule("/other", `event.type == "other"`, 1)) + assert.EqualError(t, sub.AddRoutingRule("/test", `event.type == "test"`, 1), + "subscription for topic mytopic on pubsub test already has a routing rule with priority 1") + }) + + t.Run("priority ordering", func(t *testing.T) { + sub := internal.NewTopicSubscription("test", "mytopic") + assert.NoError(t, sub.AddRoutingRule("/100", `event.type == "100"`, 100)) + assert.NoError(t, sub.AddRoutingRule("/1", `event.type == "1"`, 1)) + assert.NoError(t, sub.AddRoutingRule("/50", `event.type == "50"`, 50)) + assert.NoError(t, sub.SetDefaultRoute("/default")) + assert.Equal(t, "/default", sub.Routes.Default) + if assert.Len(t, sub.Routes.Rules, 3) { + assert.Equal(t, "/1", sub.Routes.Rules[0].Path) + assert.Equal(t, `event.type == "1"`, sub.Routes.Rules[0].Match) + assert.Equal(t, "/50", sub.Routes.Rules[1].Path) + assert.Equal(t, `event.type == "50"`, sub.Routes.Rules[1].Match) + assert.Equal(t, "/100", sub.Routes.Rules[2].Path) + assert.Equal(t, `event.type == "100"`, sub.Routes.Rules[2].Match) + } + }) +}