Skip to content

Commit

Permalink
update scheduler to split account work
Browse files Browse the repository at this point in the history
  • Loading branch information
andremedeiros committed Mar 16, 2023
1 parent b0f7f6f commit 73e7927
Showing 1 changed file with 38 additions and 42 deletions.
80 changes: 38 additions & 42 deletions internal/cmd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
_ "net/http/pprof"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/DataDog/datadog-go/statsd"
Expand All @@ -24,6 +23,7 @@ import (
)

const batchSize = 250
const accountEnqueueSeconds = 60

func SchedulerCmd(ctx context.Context) *cobra.Command {
cmd := &cobra.Command{
Expand Down Expand Up @@ -102,7 +102,7 @@ func SchedulerCmd(ctx context.Context) *cobra.Command {
s := gocron.NewScheduler(time.UTC)
s.SetMaxConcurrentJobs(8, gocron.WaitMode)

eaj, _ := s.Every(10).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
eaj, _ := s.Every(accountEnqueueSeconds).Seconds().Do(func() { enqueueAccounts(ctx, logger, statsd, db, redis, luaSha, notifQueue) })
eaj.SingletonMode()

_, _ = s.Every(5).Seconds().Do(func() { enqueueSubreddits(ctx, logger, statsd, db, []rmq.Queue{subredditQueue, trendingQueue}) })
Expand Down Expand Up @@ -483,60 +483,56 @@ func enqueueAccounts(ctx context.Context, logger *zap.Logger, statsd *statsd.Cli
ids = append(ids, id)
}

var (
enqueued int64 = 0
skipped int64 = 0
)

defer func() {
tags := []string{"queue:notifications"}
_ = statsd.Histogram("apollo.queue.enqueued", float64(enqueued), tags, 1)
_ = statsd.Histogram("apollo.queue.skipped", float64(skipped), tags, 1)
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
}()
chunks := [][]string{}
for i := 0; i < accountEnqueueSeconds; i++ {
min := (i * len(ids) / accountEnqueueSeconds)
max := ((i + 1) * len(ids)) / accountEnqueueSeconds
chunks = append(chunks, ids[min:max])
}

logger.Debug("enqueueing account batch", zap.Int("count", len(ids)), zap.Time("start", now))
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), []string{"queue:notifications"}, 1)

// Split ids in batches
wg := sync.WaitGroup{}
for i := 0; i < len(ids); i += batchSize {
for i := 0; i < accountEnqueueSeconds; i++ {
wg.Add(1)
go func(offset int, ctx context.Context) {
defer wg.Done()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

j := offset + batchSize
if j > len(ids) {
j = len(ids)
}
batch := ids[offset:j]
go func(ctx context.Context, offset int) {
ids := chunks[offset]
time.Sleep(time.Duration(offset) * time.Second)

logger.Debug("enqueueing batch", zap.Int("len", len(batch)))

unlocked, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, batch).StringSlice()
enqueued, err := redisConn.EvalSha(ctx, luaSha, []string{"locks:accounts"}, ids).StringSlice()
if err != nil {
logger.Error("failed to check for locked accounts", zap.Error(err))
}

atomic.AddInt64(&skipped, int64(len(batch)-len(unlocked)))
atomic.AddInt64(&enqueued, int64(len(unlocked)))

if len(unlocked) == 0 {
if err = queue.Publish(enqueued...); err != nil {
logger.Error("failed to enqueue account batch",
zap.Error(err),
zap.Int("offset", offset),
zap.Int("candidates", len(ids)),
zap.Int("enqueued", len(enqueued)),
)
return
}

if err = queue.Publish(unlocked...); err != nil {
logger.Error("failed to enqueue account batch", zap.Error(err))
}
}(i, ctx)
logger.Info("enqueued account batch",
zap.Int("offset", offset),
zap.Int("attempts", len(ids)),
zap.Int("enqueued", len(enqueued)),
)

}(ctx, i)
}
wg.Wait()

logger.Info("enqueued account batch",
zap.Int64("count", enqueued),
zap.Int64("skipped", skipped),
zap.Int64("duration", time.Since(now).Milliseconds()),
var (
enqueued int64 = 0
skipped int64 = 0
)

defer func() {
tags := []string{"queue:notifications"}
_ = statsd.Histogram("apollo.queue.enqueued", float64(enqueued), tags, 1)
_ = statsd.Histogram("apollo.queue.skipped", float64(skipped), tags, 1)
_ = statsd.Histogram("apollo.queue.runtime", float64(time.Since(now).Milliseconds()), tags, 1)
}()
}

0 comments on commit 73e7927

Please sign in to comment.