Skip to content

Commit

Permalink
TWEAK: pull only locks once (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorchu authored Apr 7, 2023
1 parent a6c4fd9 commit 0990fd1
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions sidekiq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,24 +213,8 @@ func (q *sidekiqQueue) Pull(opt *PullOptions) error {
if err != nil {
return err
}
pull := func() error {
lock := &redislock.Lock{
Client: q.client,
Key: fmt.Sprintf("%s:sidekiq-queue-pull:%s", opt.SidekiqNamespace, opt.SidekiqQueue),
ID: uuid.NewString(),
At: time.Now(),
ExpireInSec: 30,
MaxAcquirers: 1,
}
acquired, err := lock.Acquire()
if err != nil {
return err
}
if !acquired {
return redis.Nil
}
defer lock.Release()

pull := func() error {
res, err := q.dequeueScript.Run(context.Background(), q.client, nil,
opt.SidekiqNamespace,
opt.SidekiqQueue,
Expand Down Expand Up @@ -286,7 +270,32 @@ func (q *sidekiqQueue) Pull(opt *PullOptions) error {
return nil
}

const timeoutSec = 30
ctx, cancel := context.WithTimeout(context.Background(), timeoutSec*time.Second)
defer cancel()
lock := &redislock.Lock{
Client: q.client,
Key: fmt.Sprintf("%s:sidekiq-queue-pull:%s", opt.SidekiqNamespace, opt.SidekiqQueue),
ID: uuid.NewString(),
At: time.Now(),
ExpireInSec: timeoutSec,
MaxAcquirers: 1,
}
acquired, err := lock.Acquire()
if err != nil {
return err
}
if !acquired {
return nil
}
defer lock.Release()

for {
select {
case <-ctx.Done():
return nil
default:
}
err := pull()
if err != nil {
if errors.Is(err, redis.Nil) {
Expand Down

0 comments on commit 0990fd1

Please sign in to comment.