Skip to content

Commit 5f95d28

Browse files
authoredFeb 12, 2021
Add e2e GRPC tests for bindings (dapr#2704)
Add e2e GRPC tests for bindings Replicate the existing bindings e2e tests to utilize GRPC in addition to HTTP. dapr#1167
1 parent cb1f9a4 commit 5f95d28

File tree

10 files changed

+1618
-9
lines changed

10 files changed

+1618
-9
lines changed
 

‎go.mod

+4-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ require (
3636
go.opencensus.io v0.22.5
3737
go.opentelemetry.io/otel v0.13.0
3838
go.uber.org/atomic v1.6.0
39-
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a
40-
google.golang.org/grpc v1.33.1
39+
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb // indirect
40+
golang.org/x/sys v0.0.0-20201202213521-69691e467435 // indirect
41+
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446
42+
google.golang.org/grpc v1.34.0
4143
google.golang.org/protobuf v1.25.0
4244
gopkg.in/yaml.v2 v2.3.0
4345
k8s.io/api v0.20.0

‎go.sum

+10
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp
265265
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
266266
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
267267
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
268+
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
268269
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
269270
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
270271
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
@@ -367,6 +368,7 @@ github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4s
367368
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
368369
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
369370
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
371+
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
370372
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
371373
github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
372374
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@@ -1368,6 +1370,8 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
13681370
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
13691371
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
13701372
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
1373+
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
1374+
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
13711375
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
13721376
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
13731377
golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -1459,6 +1463,8 @@ golang.org/x/sys v0.0.0-20201015000850-e3ed0017c211/go.mod h1:h1NjWce9XRLGQEsW7w
14591463
golang.org/x/sys v0.0.0-20201022201747-fb209a7c41cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
14601464
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd h1:5CtCZbICpIOFdgO940moixOPjc0178IU44m4EjOO5IY=
14611465
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
1466+
golang.org/x/sys v0.0.0-20201202213521-69691e467435 h1:25AvDqqB9PrNqj1FLf2/70I4W0L19qqoaFq3gjNwbKk=
1467+
golang.org/x/sys v0.0.0-20201202213521-69691e467435/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
14621468
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221 h1:/ZHdbVpdR/jk3g30/d4yUL0JU9kksj8+F/bnQUVLGDM=
14631469
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
14641470
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -1630,6 +1636,8 @@ google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6D
16301636
google.golang.org/genproto v0.0.0-20201022181438-0ff5f38871d5/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
16311637
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a h1:pOwg4OoaRYScjmR4LlLgdtnyoHYTSAVhhqe5uPdpII8=
16321638
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
1639+
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446 h1:65ppmIPdaZE+BO34gntwqexoTYr30IRNGmS0OGOHu3A=
1640+
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
16331641
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
16341642
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
16351643
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@@ -1655,6 +1663,8 @@ google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0=
16551663
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
16561664
google.golang.org/grpc v1.33.1 h1:DGeFlSan2f+WEtCERJ4J9GJWk15TxUi8QGagfI87Xyc=
16571665
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
1666+
google.golang.org/grpc v1.34.0 h1:raiipEjMOIC/TO2AvyTxP25XFdLxNIBwzDh3FM3XztI=
1667+
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
16581668
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
16591669
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
16601670
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

‎tests/apps/binding_input/app.go

+2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func indexHandler(w http.ResponseWriter, r *http.Request) {
7979
}
8080

8181
func testTopicHandler(w http.ResponseWriter, r *http.Request) {
82+
log.Printf("testTopicHandler called")
8283
if r.Method == http.MethodOptions {
8384
log.Println("test-topic binding input has been accepted")
8485
// Sending StatusOK back to the topic, so it will not attempt to redeliver on session restart.
@@ -89,6 +90,7 @@ func testTopicHandler(w http.ResponseWriter, r *http.Request) {
8990

9091
var message string
9192
err := json.NewDecoder(r.Body).Decode(&message)
93+
log.Printf("Got message: %s", message)
9294
if err != nil {
9395
log.Printf("error parsing test-topic input binding payload: %s", err)
9496
w.WriteHeader(http.StatusOK)

‎tests/apps/binding_input_grpc/app.go

+177
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// ------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation.
3+
// Licensed under the MIT License.
4+
// ------------------------------------------------------------
5+
6+
package main
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
"log"
13+
"net"
14+
"sync"
15+
16+
"github.com/golang/protobuf/ptypes/any"
17+
"github.com/golang/protobuf/ptypes/empty"
18+
19+
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
20+
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
21+
22+
"google.golang.org/grpc"
23+
)
24+
25+
const (
26+
appPort = "3000"
27+
)
28+
29+
// server is our user app
30+
type server struct {
31+
}
32+
33+
type messageBuffer struct {
34+
lock *sync.RWMutex
35+
successMessages []string
36+
// errorOnce is used to make sure that message is failed only once.
37+
errorOnce bool
38+
failedMessage string
39+
}
40+
41+
func (m *messageBuffer) add(message string) {
42+
m.lock.Lock()
43+
defer m.lock.Unlock()
44+
m.successMessages = append(m.successMessages, message)
45+
}
46+
47+
func (m *messageBuffer) getAllSuccessful() []string {
48+
m.lock.RLock()
49+
defer m.lock.RUnlock()
50+
return m.successMessages
51+
}
52+
53+
func (m *messageBuffer) getFailed() string {
54+
m.lock.RLock()
55+
defer m.lock.RUnlock()
56+
return m.failedMessage
57+
}
58+
59+
func (m *messageBuffer) fail(failedMessage string) bool {
60+
m.lock.Lock()
61+
defer m.lock.Unlock()
62+
// fail only for the first time. return false all other times.
63+
if !m.errorOnce {
64+
m.failedMessage = failedMessage
65+
m.errorOnce = true
66+
return m.errorOnce
67+
}
68+
return false
69+
}
70+
71+
var messages messageBuffer = messageBuffer{
72+
lock: &sync.RWMutex{},
73+
successMessages: []string{},
74+
}
75+
76+
type receivedMessagesResponse struct {
77+
ReceivedMessages []string `json:"received_messages,omitempty"`
78+
Message string `json:"message,omitempty"`
79+
FailedMessage string `json:"failed_message,omitempty"`
80+
}
81+
82+
func main() {
83+
log.Printf("Initializing grpc")
84+
85+
/* #nosec */
86+
lis, err := net.Listen("tcp", fmt.Sprintf(":%s", appPort))
87+
if err != nil {
88+
log.Fatalf("failed to listen: %v", err)
89+
}
90+
91+
/* #nosec */
92+
s := grpc.NewServer()
93+
pb.RegisterAppCallbackServer(s, &server{})
94+
95+
log.Println("Client starting...")
96+
97+
if err := s.Serve(lis); err != nil {
98+
log.Fatalf("failed to serve: %v", err)
99+
}
100+
}
101+
102+
func (s *server) OnInvoke(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
103+
fmt.Printf("Got invoked method %s and data: %s\n", in.Method, string(in.GetData().Value))
104+
105+
switch in.Method {
106+
case "GetReceivedTopics":
107+
return s.GetReceivedTopics(ctx, in)
108+
}
109+
110+
return &commonv1pb.InvokeResponse{}, nil
111+
}
112+
113+
func (s *server) GetReceivedTopics(ctx context.Context, in *commonv1pb.InvokeRequest) (*commonv1pb.InvokeResponse, error) {
114+
failedMessage := messages.getFailed()
115+
log.Printf("failed message %s", failedMessage)
116+
resp := receivedMessagesResponse{
117+
ReceivedMessages: messages.getAllSuccessful(),
118+
FailedMessage: failedMessage,
119+
}
120+
rawResp, err := json.Marshal(resp)
121+
if err != nil {
122+
log.Printf("Could not encode response: %s", err.Error())
123+
return &commonv1pb.InvokeResponse{}, err
124+
}
125+
data := any.Any{
126+
Value: rawResp,
127+
}
128+
return &commonv1pb.InvokeResponse{
129+
Data: &data,
130+
}, nil
131+
}
132+
133+
// Dapr will call this method to get the list of topics the app wants to subscribe to.
134+
func (s *server) ListTopicSubscriptions(ctx context.Context, in *empty.Empty) (*pb.ListTopicSubscriptionsResponse, error) {
135+
log.Println("List Topic Subscription called")
136+
return &pb.ListTopicSubscriptionsResponse{
137+
Subscriptions: []*pb.TopicSubscription{},
138+
}, nil
139+
}
140+
141+
// This method is fired whenever a message has been published to a topic that has been subscribed. Dapr sends published messages in a CloudEvents 1.0 envelope.
142+
func (s *server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*pb.TopicEventResponse, error) {
143+
log.Printf("Message arrived - Topic: %s, Message: %s\n", in.Topic, string(in.Data))
144+
145+
var message string
146+
err := json.Unmarshal(in.Data, &message)
147+
log.Printf("Got message: %s", message)
148+
if err != nil {
149+
log.Printf("error parsing test-topic input binding payload: %s", err)
150+
return &pb.TopicEventResponse{}, nil
151+
}
152+
if fail := messages.fail(message); fail {
153+
// simulate failure. fail only for the first time.
154+
log.Print("failing message")
155+
return &pb.TopicEventResponse{}, nil
156+
}
157+
messages.add(message)
158+
159+
return &pb.TopicEventResponse{
160+
Status: pb.TopicEventResponse_SUCCESS,
161+
}, nil
162+
}
163+
164+
func (s *server) ListInputBindings(ctx context.Context, in *empty.Empty) (*pb.ListInputBindingsResponse, error) {
165+
log.Println("List Input Bindings called")
166+
return &pb.ListInputBindingsResponse{
167+
Bindings: []string{
168+
"test-topic-grpc",
169+
},
170+
}, nil
171+
}
172+
173+
// This method gets invoked every time a new event is fired from a registered binding. The message carries the binding name, a payload and optional metadata
174+
func (s *server) OnBindingEvent(ctx context.Context, in *pb.BindingEventRequest) (*pb.BindingEventResponse, error) {
175+
fmt.Printf("Invoked from binding: %s - %s\n", in.Name, string(in.Data))
176+
return &pb.BindingEventResponse{}, nil
177+
}

‎tests/apps/binding_output/app.go

+86-2
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,25 @@ package main
77

88
import (
99
"bytes"
10+
"context"
1011
"encoding/json"
1112
"fmt"
1213
"log"
1314
"net/http"
1415

16+
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
17+
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
18+
"github.com/golang/protobuf/ptypes/any"
19+
20+
"google.golang.org/grpc"
21+
1522
"github.com/gorilla/mux"
1623
)
1724

1825
const (
19-
appPort = 3000
20-
daprPort = 3500
26+
appPort = 3000
27+
daprPort = 3500
28+
daprPortGRPC = 50001
2129
)
2230

2331
type testCommandRequest struct {
@@ -40,6 +48,7 @@ func indexHandler(w http.ResponseWriter, r *http.Request) {
4048
}
4149

4250
func testHandler(w http.ResponseWriter, r *http.Request) {
51+
log.Printf("Entered testHandler")
4352
var requestBody testCommandRequest
4453
err := json.NewDecoder(r.Body).Decode(&requestBody)
4554
if err != nil {
@@ -76,12 +85,87 @@ func testHandler(w http.ResponseWriter, r *http.Request) {
7685
w.WriteHeader(http.StatusOK)
7786
}
7887

88+
func sendGRPC(w http.ResponseWriter, r *http.Request) {
89+
log.Printf("Entered sendGRPC")
90+
var requestBody testCommandRequest
91+
err := json.NewDecoder(r.Body).Decode(&requestBody)
92+
if err != nil {
93+
log.Printf("error parsing request body: %s", err)
94+
w.WriteHeader(http.StatusBadRequest)
95+
return
96+
}
97+
98+
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", daprPortGRPC), grpc.WithInsecure())
99+
if err != nil {
100+
log.Printf("Could not make dapr client: %s", err.Error())
101+
w.WriteHeader(http.StatusInternalServerError)
102+
return
103+
}
104+
defer conn.Close()
105+
106+
client := runtimev1pb.NewDaprClient(conn)
107+
108+
for _, message := range requestBody.Messages {
109+
body, _ := json.Marshal(&message)
110+
111+
log.Printf("Sending message: %s", body)
112+
req := runtimev1pb.InvokeBindingRequest{
113+
Name: "test-topic-grpc",
114+
Data: body,
115+
Operation: "create",
116+
}
117+
118+
_, err = client.InvokeBinding(context.Background(), &req)
119+
if err != nil {
120+
log.Printf("Error sending request to GRPC output binding: %s", err)
121+
w.WriteHeader(http.StatusInternalServerError)
122+
w.Write([]byte("error: " + err.Error()))
123+
return
124+
}
125+
}
126+
}
127+
128+
func getReceivedTopicsGRPC(w http.ResponseWriter, r *http.Request) {
129+
log.Printf("Entered getReceivedTopicsGRPC")
130+
131+
conn, err := grpc.Dial(fmt.Sprintf("localhost:%d", daprPortGRPC), grpc.WithInsecure())
132+
if err != nil {
133+
log.Printf("Could not make dapr client: %s", err.Error())
134+
w.WriteHeader(http.StatusInternalServerError)
135+
return
136+
}
137+
defer conn.Close()
138+
139+
client := runtimev1pb.NewDaprClient(conn)
140+
141+
req := runtimev1pb.InvokeServiceRequest{
142+
Id: "bindinginputgrpc",
143+
Message: &commonv1pb.InvokeRequest{
144+
Method: "GetReceivedTopics",
145+
Data: &any.Any{},
146+
HttpExtension: &commonv1pb.HTTPExtension{
147+
Verb: commonv1pb.HTTPExtension_POST,
148+
},
149+
},
150+
}
151+
152+
resp, err := client.InvokeService(context.Background(), &req)
153+
if err != nil {
154+
log.Printf("Could not get received messages: %s", err.Error())
155+
w.WriteHeader(http.StatusInternalServerError)
156+
return
157+
}
158+
w.Write(resp.Data.Value)
159+
}
160+
79161
// appRouter initializes restful api router
80162
func appRouter() *mux.Router {
81163
router := mux.NewRouter().StrictSlash(true)
82164

83165
router.HandleFunc("/", indexHandler).Methods("GET")
84166
router.HandleFunc("/tests/send", testHandler).Methods("POST")
167+
router.HandleFunc("/tests/sendGRPC", sendGRPC).Methods("POST")
168+
router.HandleFunc("/tests/get_received_topics_grpc", getReceivedTopicsGRPC).Methods("POST")
85169

86170
router.Use(mux.CORSMethodMiddleware(router))
87171

‎tests/apps/binding_output/go.mod

+7-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ module app
22

33
go 1.15
44

5-
require github.com/gorilla/mux v1.7.3
5+
require (
6+
github.com/dapr/dapr v0.11.3
7+
github.com/golang/protobuf v1.4.3
8+
github.com/gorilla/mux v1.7.3
9+
google.golang.org/api v0.39.0
10+
google.golang.org/grpc v1.34.0
11+
)
612

713
replace k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36

‎tests/apps/binding_output/go.sum

+1,270
Large diffs are not rendered by default.
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# ------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation.
3+
# Licensed under the MIT License.
4+
# ------------------------------------------------------------
5+
6+
apiVersion: dapr.io/v1alpha1
7+
kind: Component
8+
metadata:
9+
name: test-topic-grpc
10+
spec:
11+
type: bindings.kafka
12+
initTimeout: 1m
13+
version: v1
14+
metadata:
15+
# Kafka broker connection setting
16+
- name: brokers
17+
value: dapr-kafka:9092
18+
# consumer configuration: topic and consumer group
19+
- name: topics
20+
value: dapr-test-grpc
21+
- name: consumerGroup
22+
value: group1
23+
# publisher configuration: topic
24+
- name: publishTopic
25+
value: dapr-test-grpc
26+
- name: authRequired
27+
value: "false"

‎tests/dapr_tests.mk

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ secretapp \
1414
service_invocation \
1515
service_invocation_grpc \
1616
binding_input \
17+
binding_input_grpc \
1718
binding_output \
1819
pubsub-publisher \
1920
pubsub-subscriber \
@@ -232,6 +233,7 @@ setup-test-components: setup-app-configurations
232233
$(KUBECTL) apply -f ./tests/config/dapr_tests_cluster_role_binding.yaml --namespace $(DAPR_TEST_NAMESPACE)
233234
$(KUBECTL) apply -f ./tests/config/dapr_redis_pubsub.yaml --namespace $(DAPR_TEST_NAMESPACE)
234235
$(KUBECTL) apply -f ./tests/config/dapr_kafka_bindings.yaml --namespace $(DAPR_TEST_NAMESPACE)
236+
$(KUBECTL) apply -f ./tests/config/dapr_kafka_bindings_grpc.yaml --namespace $(DAPR_TEST_NAMESPACE)
235237
$(KUBECTL) apply -f ./tests/config/app_topic_subscription_pubsub.yaml --namespace $(DAPR_TEST_NAMESPACE)
236238
$(KUBECTL) apply -f ./tests/config/kubernetes_allowlists_config.yaml --namespace $(DAPR_TEST_NAMESPACE)
237239
$(KUBECTL) apply -f ./tests/config/kubernetes_allowlists_grpc_config.yaml --namespace $(DAPR_TEST_NAMESPACE)

‎tests/e2e/bindings/bindings_test.go

+33-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/dapr/dapr/tests/e2e/utils"
1919
kube "github.com/dapr/dapr/tests/platforms/kubernetes"
2020
"github.com/dapr/dapr/tests/runner"
21+
2122
"github.com/stretchr/testify/require"
2223
)
2324

@@ -52,7 +53,7 @@ const (
5253
// Number of times to call the endpoint to check for health.
5354
numHealthChecks = 60
5455
// Number of seconds to wait for binding travelling throughout the cluster.
55-
bindingPropagationDelay = 5
56+
bindingPropagationDelay = 10
5657
)
5758

5859
var tr *runner.TestRunner
@@ -75,6 +76,14 @@ func TestMain(m *testing.M) {
7576
Replicas: 1,
7677
IngressEnabled: true,
7778
},
79+
{
80+
AppName: "bindinginputgrpc",
81+
DaprEnabled: true,
82+
ImageName: "e2e-binding_input_grpc",
83+
Replicas: 1,
84+
IngressEnabled: true,
85+
AppProtocol: "grpc",
86+
},
7887
}
7988

8089
tr = runner.NewTestRunner("bindings", testApps, nil, nil)
@@ -87,7 +96,8 @@ func TestBindings(t *testing.T) {
8796
require.NotEmpty(t, outputExternalURL, "bindingoutput external URL must not be empty!")
8897
inputExternalURL := tr.Platform.AcquireAppExternalURL("bindinginput")
8998
require.NotEmpty(t, inputExternalURL, "bindinginput external URL must not be empty!")
90-
99+
inputGRPCExternalURL := tr.Platform.AcquireAppExternalURL("bindinginputgrpc")
100+
require.NotEmpty(t, inputGRPCExternalURL, "bindinginput external URL must not be empty!")
91101
// This initial probe makes the test wait a little bit longer when needed,
92102
// making this test less flaky due to delays in the deployment.
93103
_, err := utils.HTTPGetNTimes(outputExternalURL, numHealthChecks)
@@ -102,13 +112,13 @@ func TestBindings(t *testing.T) {
102112
body, err := json.Marshal(req)
103113
require.NoError(t, err)
104114

105-
// act
115+
// act for http
106116
httpPostWithAssert(t, fmt.Sprintf("%s/tests/send", outputExternalURL), body, http.StatusOK)
107117

108118
// This delay allows all the messages to reach corresponding input bindings.
109119
time.Sleep(bindingPropagationDelay * time.Second)
110120

111-
// assert
121+
// assert for HTTP
112122
resp := httpPostWithAssert(t, fmt.Sprintf("%s/tests/get_received_topics", inputExternalURL), nil, http.StatusOK)
113123

114124
var decodedResponse receivedTopicsResponse
@@ -120,6 +130,25 @@ func TestBindings(t *testing.T) {
120130
require.Equal(t, testMessages[1:], decodedResponse.ReceivedMessages)
121131
// one message fails.
122132
require.Equal(t, testMessages[0], decodedResponse.FailedMessage)
133+
134+
// act for gRPC
135+
httpPostWithAssert(t, fmt.Sprintf("%s/tests/sendGRPC", outputExternalURL), body, http.StatusOK)
136+
137+
// This delay allows all the messages to reach corresponding input bindings.
138+
time.Sleep(bindingPropagationDelay * time.Second)
139+
140+
// assert for gRPC
141+
resp = httpPostWithAssert(t, fmt.Sprintf("%s/tests/get_received_topics_grpc", outputExternalURL), nil, http.StatusOK)
142+
143+
// assert for gRPC
144+
err = json.Unmarshal(resp, &decodedResponse)
145+
require.NoError(t, err)
146+
147+
// Only the first message fails, all other messages are successfully consumed.
148+
// nine messages succeed.
149+
require.Equal(t, testMessages[1:], decodedResponse.ReceivedMessages)
150+
// one message fails.
151+
require.Equal(t, testMessages[0], decodedResponse.FailedMessage)
123152
}
124153

125154
func httpPostWithAssert(t *testing.T, url string, data []byte, status int) []byte {

0 commit comments

Comments
 (0)
Please sign in to comment.