Skip to content

Commit

Permalink
create WebhookEventAck message type and send it when receiving a webh… (
Browse files Browse the repository at this point in the history
stripe#680)

* create WebhookEventAck message type and send it when receiving a webhook event

* field name
  • Loading branch information
pepin-stripe authored Jun 2, 2021
1 parent ea744b9 commit c0c9fbd
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ func (p *Proxy) processWebhookEvent(msg websocket.IncomingMessage) {
"api_version": getAPIVersionString(msg.Endpoint.APIVersion),
}).Trace("Webhook event trace")

// at this point the message is valid so we can acknowledge it
ackMessage := websocket.NewWebhookEventAck(webhookEvent.WebhookID, webhookEvent.WebhookConversationID)
p.webSocketClient.SendMessage(ackMessage)

if p.filterWebhookEvent(webhookEvent) {
return
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/websocket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (c *Client) writePump() {

for {
select {
case whResp, ok := <-c.send:
case outMsg, ok := <-c.send:
err := c.conn.SetWriteDeadline(time.Now().Add(c.cfg.WriteWait))
if err != nil {
c.cfg.Log.Debug("SetWriteDeadline error: ", err)
Expand All @@ -400,13 +400,13 @@ func (c *Client) writePump() {
"prefix": "websocket.Client.writePump",
}).Debug("Sending text message")

err = c.conn.WriteJSON(whResp)
err = c.conn.WriteJSON(outMsg)
if err != nil {
if ws.IsUnexpectedCloseError(err, ws.CloseNormalClosure) {
c.cfg.Log.Error("write error: ", err)
}
// Requeue the message to be processed when writePump restarts
c.send <- whResp
c.send <- outMsg
c.notifyClose <- err

return
Expand Down
3 changes: 3 additions & 0 deletions pkg/websocket/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (m *IncomingMessage) UnmarshalJSON(data []byte) error {
func (m OutgoingMessage) MarshalJSON() ([]byte, error) {
if m.WebhookResponse != nil {
return json.Marshal(m.WebhookResponse)
} else if m.WebhookEventAck != nil {
return json.Marshal(m.WebhookEventAck)
}

return json.Marshal(nil)
Expand All @@ -55,4 +57,5 @@ func (m OutgoingMessage) MarshalJSON() ([]byte, error) {
// OutgoingMessage represents any outgoing message sent to Stripe.
type OutgoingMessage struct {
*WebhookResponse
*WebhookEventAck
}
19 changes: 19 additions & 0 deletions pkg/websocket/webhook_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ type WebhookEvent struct {
WebhookID string `json:"webhook_id"`
}

// WebhookEventAck represents outgoing Ack message for a webhook event
// received by Stripe.
type WebhookEventAck struct {
Type string `json:"type"` // always "webhook_event_ack"
WebhookConversationID string `json:"webhook_conversation_id"`
WebhookID string `json:"webhook_id"` // ID of the webhook event
}

// WebhookResponse represents outgoing webhook response messages sent to
// Stripe.
type WebhookResponse struct {
Expand All @@ -42,3 +50,14 @@ func NewWebhookResponse(webhookID, webhookConversationID, forwardURL string, sta
},
}
}

// NewWebhookEventAck returns a new WebhookEventAck message.
func NewWebhookEventAck(webhookID, webhookConversationID string) *OutgoingMessage {
return &OutgoingMessage{
WebhookEventAck: &WebhookEventAck{
WebhookID: webhookID,
WebhookConversationID: webhookConversationID,
Type: "webhook_event_ack",
},
}
}
33 changes: 30 additions & 3 deletions pkg/websocket/webhook_messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,38 @@ func TestNewWebhookResponse(t *testing.T) {
)

require.NotNil(t, msg.WebhookResponse)
require.Equal(t, "webhook_response", msg.Type)
require.Equal(t, "wh_123", msg.WebhookID)
require.Equal(t, "wc_123", msg.WebhookConversationID)
require.Equal(t, "webhook_response", msg.WebhookResponse.Type)
require.Equal(t, "wh_123", msg.WebhookResponse.WebhookID)
require.Equal(t, "wc_123", msg.WebhookResponse.WebhookConversationID)
require.Equal(t, "http://localhost:5000/webhooks", msg.ForwardURL)
require.Equal(t, 200, msg.Status)
require.Equal(t, "foo", msg.Body)
require.Equal(t, "bar", msg.HTTPHeaders["Response-Header"])
}

func TestMarshalWebhookEventAck(t *testing.T) {
msg := NewWebhookEventAck(
"wh_123",
"wc_123",
)

buf, err := json.Marshal(msg)
require.NoError(t, err)

json := string(buf)
require.Equal(t, "wh_123", gjson.Get(json, "webhook_id").String())
require.Equal(t, "wc_123", gjson.Get(json, "webhook_conversation_id").String())
require.Equal(t, "webhook_event_ack", gjson.Get(json, "type").String())
}

func TestNewWebhookEventAck(t *testing.T) {
msg := NewWebhookEventAck(
"wh_123",
"wc_123",
)

require.NotNil(t, msg.WebhookEventAck)
require.Equal(t, "webhook_event_ack", msg.WebhookEventAck.Type)
require.Equal(t, "wh_123", msg.WebhookEventAck.WebhookID)
require.Equal(t, "wc_123", msg.WebhookEventAck.WebhookConversationID)
}

0 comments on commit c0c9fbd

Please sign in to comment.