Skip to content

Commit

Permalink
Add long-polling support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay committed Jul 30, 2024
1 parent 30106c3 commit 39d5bf9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
12 changes: 7 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ type SQLiteConfig struct {
}

type SQSConfig struct {
Enabled bool `name:"enabled" default:"true" help:"Enable SQS protocol for queue" env:"ENABLED"`
Port int `name:"port" default:"3001" help:"HTTP port for SQS protocol" env:"PORT"`
Keys []AWSKey `name:"keys" default:"DEV_ACCESS_KEY_ID:DEV_SECRET_ACCESS_KEY" env:"KEYS"`
ParseCelery bool `name:"parse-celery" default:"true" env:"PARSE_CELERY" help:"Parse Celery messages. Lets you search by celery message ID and task type."`
MaxRequestSize int `name:"max-request-size" default:"1048576" env:"MAX_REQUEST_SIZE" help:"Max size of SQS request in bytes"`
Enabled bool `name:"enabled" default:"true" help:"Enable SQS protocol for queue" env:"ENABLED"`
Port int `name:"port" default:"3001" help:"HTTP port for SQS protocol" env:"PORT"`
Keys []AWSKey `name:"keys" default:"DEV_ACCESS_KEY_ID:DEV_SECRET_ACCESS_KEY" env:"KEYS"`
ParseCelery bool `name:"parse-celery" default:"true" env:"PARSE_CELERY" help:"Parse Celery messages. Lets you search by celery message ID and task type."`
MaxRequestSize int `name:"max-request-size" default:"1048576" env:"MAX_REQUEST_SIZE" help:"Max size of SQS request in bytes"`
MaxDelaySeconds int `name:"max-delay-seconds" default:"30" env:"MAX_DELAY_SECONDS" help:"Max allowed wait time for long polling"`
DelayRetryMillis int `name:"delay-retry-millis" default:"1000" env:"DELAY_RETRY_MILLIS" help:"When long polling, how often to request new items"`
}

type AWSKey struct {
Expand Down
29 changes: 24 additions & 5 deletions protocols/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,9 @@ func (s *SQS) ReceiveMessage(c *fiber.Ctx, tenantId int64) error {
return err
}

if req.MaxNumberOfMessages == 0 {
req.MaxNumberOfMessages = 1
maxNumberOfMessages := req.MaxNumberOfMessages
if maxNumberOfMessages == 0 {
maxNumberOfMessages = 1
}

// log.Println(req)
Expand All @@ -463,9 +464,27 @@ func (s *SQS) ReceiveMessage(c *fiber.Ctx, tenantId int64) error {
visibilityTimeout = req.VisibilityTimeout
}

messages, err := s.queue.Dequeue(tenantId, queue, req.MaxNumberOfMessages, visibilityTimeout)
if err != nil {
return err
var messages []*models.Message

waitTime := min(req.WaitTimeSeconds, s.cfg.MaxDelaySeconds)
maxRequestTime := time.Now().Add(time.Duration(waitTime) * time.Second)
sleepIncrement := time.Duration(s.cfg.DelayRetryMillis) * time.Millisecond

for {
messages, err = s.queue.Dequeue(tenantId, queue, maxNumberOfMessages, visibilityTimeout)
if err != nil {
return err
}

if len(messages) > 0 {
break
}

if time.Now().Equal(maxRequestTime) || time.Now().After(maxRequestTime) {
break
}

time.Sleep(sleepIncrement)
}

response := ReceiveMessageResponse{
Expand Down

0 comments on commit 39d5bf9

Please sign in to comment.