Skip to content

Commit

Permalink
Send ack messages for RequestLog messages (stripe#688)
Browse files Browse the repository at this point in the history
* send ack messages for request log too

* tests

* use event id instead of event token
  • Loading branch information
pepin-stripe authored Jun 8, 2021
1 parent c0c9fbd commit 8e48b2d
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 50 deletions.
4 changes: 4 additions & 0 deletions pkg/logtailing/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ func (t *Tailer) processRequestLogEvent(msg websocket.IncomingMessage) {
t.cfg.Log.Debug("Received malformed payload: ", err)
}

// at this point the message is valid so we can acknowledge it
ackMessage := websocket.NewEventAck(requestLogEvent.RequestLogID, "")
t.webSocketClient.SendMessage(ackMessage)

// Don't show stripecli/sessions logs since they're generated by the CLI
if payload.URL == "/v1/stripecli/sessions" {
t.cfg.Log.Debug("Filtering out /v1/stripecli/sessions from logs")
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (p *Proxy) processWebhookEvent(msg websocket.IncomingMessage) {
}).Trace("Webhook event trace")

// at this point the message is valid so we can acknowledge it
ackMessage := websocket.NewWebhookEventAck(webhookEvent.WebhookID, webhookEvent.WebhookConversationID)
ackMessage := websocket.NewEventAck(webhookEvent.WebhookID, webhookEvent.WebhookConversationID)
p.webSocketClient.SendMessage(ackMessage)

if p.filterWebhookEvent(webhookEvent) {
Expand Down
25 changes: 22 additions & 3 deletions pkg/websocket/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,34 @@ func (m *IncomingMessage) UnmarshalJSON(data []byte) error {
func (m OutgoingMessage) MarshalJSON() ([]byte, error) {
if m.WebhookResponse != nil {
return json.Marshal(m.WebhookResponse)
} else if m.WebhookEventAck != nil {
return json.Marshal(m.WebhookEventAck)
} else if m.EventAck != nil {
return json.Marshal(m.EventAck)
}

return json.Marshal(nil)
}

// EventAck represents outgoing Ack messages
// for events received by Stripe.
type EventAck struct {
Type string `json:"type"` // always "event_ack"
WebhookConversationID string `json:"webhook_conversation_id"`
EventID string `json:"event_id"` // ID of the event
}

// NewEventAck returns a new EventAck message.
func NewEventAck(eventID, webhookConversationID string) *OutgoingMessage {
return &OutgoingMessage{
EventAck: &EventAck{
EventID: eventID,
WebhookConversationID: webhookConversationID,
Type: "event_ack",
},
}
}

// OutgoingMessage represents any outgoing message sent to Stripe.
type OutgoingMessage struct {
*WebhookResponse
*WebhookEventAck
*EventAck
}
55 changes: 55 additions & 0 deletions pkg/websocket/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
)

func TestUnmarshalUnknownIncomingMsg(t *testing.T) {
Expand All @@ -14,3 +15,57 @@ func TestUnmarshalUnknownIncomingMsg(t *testing.T) {
err := json.Unmarshal([]byte(data), &msg)
require.EqualError(t, err, "Unexpected message type: unknown_type")
}

func TestMarshalWebhookEventAck(t *testing.T) {
msg := NewEventAck(
"wh_123",
"wc_123",
)

buf, err := json.Marshal(msg)
require.NoError(t, err)

json := string(buf)
require.Equal(t, "wh_123", gjson.Get(json, "event_id").String())
require.Equal(t, "wc_123", gjson.Get(json, "webhook_conversation_id").String())
require.Equal(t, "event_ack", gjson.Get(json, "type").String())
}

func TestMarshalWebhookEventAckRequestLog(t *testing.T) {
msg := NewEventAck(
"wh_123",
"",
)

buf, err := json.Marshal(msg)
require.NoError(t, err)

json := string(buf)
require.Equal(t, "wh_123", gjson.Get(json, "event_id").String())
require.Equal(t, "", gjson.Get(json, "webhook_conversation_id").String())
require.Equal(t, "event_ack", gjson.Get(json, "type").String())
}

func TestNewWebhookEventAck(t *testing.T) {
msg := NewEventAck(
"wh_123",
"wc_123",
)

require.NotNil(t, msg.EventAck)
require.Equal(t, "event_ack", msg.EventAck.Type)
require.Equal(t, "wh_123", msg.EventAck.EventID)
require.Equal(t, "wc_123", msg.EventAck.WebhookConversationID)
}

func TestNewWebhookEventAckRequestLog(t *testing.T) {
msg := NewEventAck(
"wh_123",
"",
)

require.NotNil(t, msg.EventAck)
require.Equal(t, "event_ack", msg.EventAck.Type)
require.Equal(t, "wh_123", msg.EventAck.EventID)
require.Equal(t, "", msg.EventAck.WebhookConversationID)
}
19 changes: 0 additions & 19 deletions pkg/websocket/webhook_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ type WebhookEvent struct {
WebhookID string `json:"webhook_id"`
}

// WebhookEventAck represents outgoing Ack message for a webhook event
// received by Stripe.
type WebhookEventAck struct {
Type string `json:"type"` // always "webhook_event_ack"
WebhookConversationID string `json:"webhook_conversation_id"`
WebhookID string `json:"webhook_id"` // ID of the webhook event
}

// WebhookResponse represents outgoing webhook response messages sent to
// Stripe.
type WebhookResponse struct {
Expand All @@ -50,14 +42,3 @@ func NewWebhookResponse(webhookID, webhookConversationID, forwardURL string, sta
},
}
}

// NewWebhookEventAck returns a new WebhookEventAck message.
func NewWebhookEventAck(webhookID, webhookConversationID string) *OutgoingMessage {
return &OutgoingMessage{
WebhookEventAck: &WebhookEventAck{
WebhookID: webhookID,
WebhookConversationID: webhookConversationID,
Type: "webhook_event_ack",
},
}
}
27 changes: 0 additions & 27 deletions pkg/websocket/webhook_messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,3 @@ func TestNewWebhookResponse(t *testing.T) {
require.Equal(t, "foo", msg.Body)
require.Equal(t, "bar", msg.HTTPHeaders["Response-Header"])
}

func TestMarshalWebhookEventAck(t *testing.T) {
msg := NewWebhookEventAck(
"wh_123",
"wc_123",
)

buf, err := json.Marshal(msg)
require.NoError(t, err)

json := string(buf)
require.Equal(t, "wh_123", gjson.Get(json, "webhook_id").String())
require.Equal(t, "wc_123", gjson.Get(json, "webhook_conversation_id").String())
require.Equal(t, "webhook_event_ack", gjson.Get(json, "type").String())
}

func TestNewWebhookEventAck(t *testing.T) {
msg := NewWebhookEventAck(
"wh_123",
"wc_123",
)

require.NotNil(t, msg.WebhookEventAck)
require.Equal(t, "webhook_event_ack", msg.WebhookEventAck.Type)
require.Equal(t, "wh_123", msg.WebhookEventAck.WebhookID)
require.Equal(t, "wc_123", msg.WebhookEventAck.WebhookConversationID)
}

0 comments on commit 8e48b2d

Please sign in to comment.