Skip to content

Commit

Permalink
make sure we don't run jobs that are too old
Browse files Browse the repository at this point in the history
  • Loading branch information
andremedeiros committed Mar 16, 2023
1 parent fc0d290 commit 1996434
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions internal/worker/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,24 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {
_ = nc.statsd.Incr("apollo.consumer.executions", notificationTags, 0.1)
}()

defer func(ctx context.Context) {
_, span := nc.tracer.Start(ctx, "queue:ack")
defer span.End()

if err := delivery.Ack(); err != nil {
span.SetStatus(codes.Error, "failed to acknowledge message")
span.RecordError(err)
logger.Error("failed to acknowledge message", zap.Error(err))
}
}(ctx)

// Measure queue latency
key := fmt.Sprintf("locks:accounts:%s", id)
ttl := nc.redis.PTTL(ctx, key).Val()
if ttl == 0 {
logger.Debug("job is too old, skipping")
return
}
age := (domain.NotificationCheckTimeout - ttl)
_ = nc.statsd.Histogram("apollo.dequeue.latency", float64(age.Milliseconds()), notificationTags, 0.1)

Expand All @@ -171,17 +186,6 @@ func (nc *notificationsConsumer) Consume(delivery rmq.Delivery) {

logger.Debug("starting job")

defer func(ctx context.Context) {
_, span := nc.tracer.Start(ctx, "queue:ack")
defer span.End()

if err := delivery.Ack(); err != nil {
span.SetStatus(codes.Error, "failed to acknowledge message")
span.RecordError(err)
logger.Error("failed to acknowledge message", zap.Error(err))
}
}(ctx)

account, err := nc.accountRepo.GetByRedditID(ctx, id)
if err != nil {
if err != domain.ErrNotFound {
Expand Down

0 comments on commit 1996434

Please sign in to comment.