Skip to content

Commit

Permalink
Remove race from TestRecur, and add more documentation (comments).
Browse files Browse the repository at this point in the history
  • Loading branch information
tooolbox committed May 26, 2020
1 parent 1a08633 commit 75fc87b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 11 deletions.
4 changes: 2 additions & 2 deletions job/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *MemoryJobCache) Start(persistWaitTime time.Duration) {
}
for _, j := range allJobs {
if j.ShouldStartWaiting() {
j.StartWaiting(c)
j.StartWaiting(c, false)
}
err = c.Set(j)
if err != nil {
Expand Down Expand Up @@ -195,7 +195,7 @@ func (c *LockFreeJobCache) Start(persistWaitTime time.Duration, jobstatTtl time.
continue
}
if j.ShouldStartWaiting() {
j.StartWaiting(c)
j.StartWaiting(c, false)
}
log.Infof("Job %s:%s added to cache.", j.Name, j.Id)
err := c.Set(j)
Expand Down
21 changes: 14 additions & 7 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Job struct {
IsDone bool `json:"is_done"`

// The job will send on this channel when it's done running; used for tests.
// Note that if the job should be rescheduled, it will send on this channel
// when it's done rescheduling rather than when the job is done running.
// That's most useful for testing the scheduling aspect of jobs.
ranChan chan struct{}
}

Expand Down Expand Up @@ -228,7 +231,7 @@ func (j *Job) Init(cache JobCache) error {
}

j.lock.Unlock()
j.StartWaiting(cache)
j.StartWaiting(cache, false)

j.lock.Lock()

Expand Down Expand Up @@ -302,7 +305,7 @@ func (j *Job) InitDelayDuration(checkTime bool) error {
}

// StartWaiting begins a timer for when it should execute the Jobs .Run() method.
func (j *Job) StartWaiting(cache JobCache) {
func (j *Job) StartWaiting(cache JobCache, justRan bool) {
waitDuration := j.GetWaitDuration()

j.lock.Lock()
Expand All @@ -314,6 +317,10 @@ func (j *Job) StartWaiting(cache JobCache) {

jobRun := func() { j.Run(cache) }
j.jobTimer = j.clk.Time().AfterFunc(waitDuration, jobRun)

if justRan && j.ranChan != nil {
j.ranChan <- struct{}{}
}
}

func (j *Job) GetWaitDuration() time.Duration {
Expand Down Expand Up @@ -377,7 +384,7 @@ func (j *Job) Enable(cache JobCache) {
defer j.lock.Unlock()

if j.jobTimer != nil && j.Disabled {
go j.StartWaiting(cache)
go j.StartWaiting(cache, false)
}
j.Disabled = false
}
Expand Down Expand Up @@ -486,13 +493,13 @@ func (j *Job) Run(cache JobCache) {
}

if j.ShouldStartWaiting() {
go j.StartWaiting(cache)
go j.StartWaiting(cache, true)
} else {
j.IsDone = true
}

if j.ranChan != nil {
j.ranChan <- struct{}{}
if j.ranChan != nil {
j.ranChan <- struct{}{}
}
}

j.lock.Unlock()
Expand Down
23 changes: 21 additions & 2 deletions job/job_recurring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ var recurTableTests = []struct {
},
}

// This test works by using a series of checkpoints, spaced <interval> apart.
// A job is scheduled 5 seconds after the first checkpoint.
// By moving the clock to each checkpoint, and then 6 seconds later,
// you can verify that the job hasn't run between the two checkpoints,
// and only runs at the scheduled point.
//
// This is useful for ensuring that durations behave correctly on a grand scale.
func TestRecur(t *testing.T) {

for _, testStruct := range recurTableTests {
Expand All @@ -122,23 +129,35 @@ func TestRecur(t *testing.T) {
start := now.Add(time.Second * 5)
j := GetMockRecurringJobWithSchedule(start, testStruct.Interval)
j.clk.SetClock(clk)
j.ResumeAtNextScheduledTime = true // This is important to have on so that there's no drift.

cache := NewMockCache()
j.Init(cache)
j.ranChan = make(chan struct{})

checkpoints := append([]string{testStruct.Start}, testStruct.Checkpoints...)

for i, chk := range checkpoints {

clk.SetTime(parseTimeInLocation(t, chk, testStruct.Location))
briefPause()

select {
case <-j.ranChan:
t.Fatalf("Expected job not run on checkpoint %d of test %s.", i, testStruct.Name)
case <-time.After(time.Second):
}

j.lock.RLock()
assert.Equal(t, i, int(j.Metadata.SuccessCount), fmt.Sprintf("1st Test of %s index %d", testStruct.Name, i))
j.lock.RUnlock()

clk.AddTime(time.Second * 6)
briefPause()

select {
case <-j.ranChan:
case <-time.After(time.Second):
t.Fatalf("Expected job to have run on checkpoint %d of test %s.", i, testStruct.Name)
}

j.lock.RLock()
assert.Equal(t, i+1, int(j.Metadata.SuccessCount), fmt.Sprintf("2nd Test of %s index %d", testStruct.Name, i))
Expand Down

0 comments on commit 75fc87b

Please sign in to comment.