Skip to content

Commit

Permalink
Refactor ramping VUs executor
Browse files Browse the repository at this point in the history
RampingVUs.Run method was complex and this refactors it by adding
rampingVUsRunState to share run state between functions used by
the Run method.

+ Makes the Run method easier to read and understand
+ Makes it explicit which Goroutines are being fired in the Run
+ Separates responsibilities to other parts like:
  + rampingVUsRunState and its methods.
  + Moves trackProgress Goroutine from the makeProgressFn
    to the Run method
  + Makes initializeVUs to only handle initializing GRs
  + Uses two strategy functions to make them reusable
    and clearer. handleVUs and handleRemainingVUs use them.
+ Makes the handleVUs algorithm easier to understand and
  manage.
  • Loading branch information
inancgumus committed Nov 18, 2021
1 parent 434b2e7 commit cf3e5ce
Showing 1 changed file with 151 additions and 120 deletions.
271 changes: 151 additions & 120 deletions lib/executor/ramping_vus.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,154 +493,185 @@ var _ lib.Executor = &RampingVUs{}

// Run constantly loops through as many iterations as possible on a variable
// number of VUs for the specified stages.
//
// TODO: split up? since this does a ton of things, unfortunately I can't think
// of a less complex way to implement it (besides the old "increment by 100ms
// and see what happens)... :/ so maybe see how it can be split?
// nolint:funlen,gocognit,cyclop
func (vlv RampingVUs) Run(
parentCtx context.Context, out chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics,
) (err error) {
rawExecutionSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regularDuration, isFinal := lib.GetEndOffset(rawExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regularDuration)
func (vlv RampingVUs) Run(ctx context.Context, _ chan<- stats.SampleContainer, _ *metrics.BuiltinMetrics) error {
rawSteps := vlv.config.getRawExecutionSteps(vlv.executionState.ExecutionTuple, true)
regDur, finalRaw := lib.GetEndOffset(rawSteps)
if !finalRaw {
return fmt.Errorf("%s expected raw end offset at %s to be final", vlv.config.GetName(), regDur)
}

gracefulExecutionSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDuration, isFinal := lib.GetEndOffset(gracefulExecutionSteps)
if !isFinal {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDuration)
gracefulSteps := vlv.config.GetExecutionRequirements(vlv.executionState.ExecutionTuple)
maxDur, finalGraceful := lib.GetEndOffset(gracefulSteps)
if !finalGraceful {
return fmt.Errorf("%s expected graceful end offset at %s to be final", vlv.config.GetName(), maxDur)
}
maxVUs := lib.GetMaxPlannedVUs(gracefulExecutionSteps)
gracefulStop := maxDuration - regularDuration

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, regularDuration, gracefulStop)
startMaxVUs := lib.GetMaxPlannedVUs(gracefulSteps)
startTime, maxDurCtx, regDurCtx, cancel := getDurationContexts(ctx, regDur, maxDur-regDur)
defer cancel()

activeVUs := &sync.WaitGroup{}
defer activeVUs.Wait()

// Make sure the log and the progress bar have accurate information
vlv.logger.WithFields(logrus.Fields{
"type": vlv.config.GetType(), "startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple), "maxVUs": maxVUs,
"duration": regularDuration, "numStages": len(vlv.config.Stages),
},
).Debug("Starting executor run...")

activeVUsCount := new(int64)
vusFmt := pb.GetFixedLengthIntFormat(int64(maxVUs))
regularDurationStr := pb.GetFixedLengthDuration(regularDuration, regularDuration)
progressFn := func() (float64, []string) {
spent := time.Since(startTime)
currentlyActiveVUs := atomic.LoadInt64(activeVUsCount)
vus := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
if spent > regularDuration {
return 1, []string{vus, regularDuration.String()}
}
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currentlyActiveVUs, maxVUs)
progDur := pb.GetFixedLengthDuration(spent, regularDuration) + "/" + regularDurationStr
return float64(spent) / float64(regularDuration), []string{progVUs, progDur}
"type": vlv.config.GetType(),
"startVUs": vlv.config.GetStartVUs(vlv.executionState.ExecutionTuple),
"maxVUs": startMaxVUs,
"duration": regDur,
"numStages": len(vlv.config.Stages),
}).Debug("Starting executor run...")

runState := &rampingVUsRunState{
executor: vlv,
wg: new(sync.WaitGroup),
vuHandles: make([]*vuHandle, startMaxVUs),
maxVUs: startMaxVUs,
activeVUsCount: new(int64),
started: startTime,
rawSteps: rawSteps,
gracefulSteps: gracefulSteps,
runIteration: getIterationRunner(vlv.executionState, vlv.logger),
}
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(parentCtx, maxDurationCtx, regDurationCtx, vlv, progressFn)

// Actually schedule the VUs and iterations, likely the most complicated
// executor among all of them...
runIteration := getIterationRunner(vlv.executionState, vlv.logger)
getVU := func() (lib.InitializedVU, error) {
initVU, err := vlv.executionState.GetPlannedVU(vlv.logger, false)
if err != nil {
vlv.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
} else {
activeVUs.Add(1)
atomic.AddInt64(activeVUsCount, 1)
vlv.executionState.ModCurrentlyActiveVUsCount(+1)
}
return initVU, err
}
returnVU := func(initVU lib.InitializedVU) {
vlv.executionState.ReturnVU(initVU, false)
atomic.AddInt64(activeVUsCount, -1)
activeVUs.Done()
vlv.executionState.ModCurrentlyActiveVUsCount(-1)
}

maxDurationCtx = lib.WithScenarioState(maxDurationCtx, &lib.ScenarioState{
progressFn := runState.makeProgressFn(regDur)
maxDurCtx = lib.WithScenarioState(maxDurCtx, &lib.ScenarioState{
Name: vlv.config.Name,
Executor: vlv.config.Type,
StartTime: startTime,
StartTime: runState.started,
ProgressFn: progressFn,
})
vlv.progress.Modify(pb.WithProgress(progressFn))
go trackProgress(ctx, maxDurCtx, regDurCtx, vlv, progressFn)

vuHandles := make([]*vuHandle, maxVUs)
for i := uint64(0); i < maxVUs; i++ {
vuHandle := newStoppedVUHandle(
maxDurationCtx, getVU, returnVU, vlv.nextIterationCounters,
&vlv.config.BaseConfig, vlv.logger.WithField("vuNum", i))
go vuHandle.runLoopsIfPossible(runIteration)
vuHandles[i] = vuHandle
defer runState.wg.Wait()
runState.populateVUHandles(maxDurCtx, cancel)
for i := uint64(0); i < runState.maxVUs; i++ {
go runState.vuHandles[i].runLoopsIfPossible(runState.runIteration)
}
runState.handleVUs(ctx)
go runState.handleRemainingVUs(ctx)

// 0 <= currentScheduledVUs <= currentMaxAllowedVUs <= maxVUs
var currentScheduledVUs, currentMaxAllowedVUs uint64
return nil
}

handleNewScheduledVUs := func(newScheduledVUs uint64) {
if newScheduledVUs > currentScheduledVUs {
for vuNum := currentScheduledVUs; vuNum < newScheduledVUs; vuNum++ {
_ = vuHandles[vuNum].start() // TODO handle error
}
} else {
for vuNum := newScheduledVUs; vuNum < currentScheduledVUs; vuNum++ {
vuHandles[vuNum].gracefulStop()
}
// rampingVUsRunState is created and initialized by the Run() method
// of the ramping VUs executor. It is used to track and modify various
// details of the execution.
type rampingVUsRunState struct {
executor RampingVUs
vuHandles []*vuHandle // handles for manipulating and tracking all of the VUs
maxVUs uint64 // the scaled number of initially configured MaxVUs
activeVUsCount *int64 // the current number of active VUs, used only for the progress display
started time.Time
rawSteps, gracefulSteps []lib.ExecutionStep
wg *sync.WaitGroup

runIteration func(context.Context, lib.ActiveVU) bool // a helper closure function that runs a single iteration
}

func (rs rampingVUsRunState) makeProgressFn(total time.Duration) (progressFn func() (float64, []string)) {
vusFmt := pb.GetFixedLengthIntFormat(int64(rs.maxVUs))
regDuration := pb.GetFixedLengthDuration(total, total)

return func() (float64, []string) {
spent := time.Since(rs.started)
cur := atomic.LoadInt64(rs.activeVUsCount)
progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", cur, rs.maxVUs)
if spent > total {
return 1, []string{progVUs, total.String()}
}
currentScheduledVUs = newScheduledVUs
progDur := pb.GetFixedLengthDuration(spent, total) + "/" + regDuration
return float64(spent) / float64(total), []string{progVUs, progDur}
}
}

handleNewMaxAllowedVUs := func(newMaxAllowedVUs uint64) {
if newMaxAllowedVUs < currentMaxAllowedVUs {
for vuNum := newMaxAllowedVUs; vuNum < currentMaxAllowedVUs; vuNum++ {
vuHandles[vuNum].hardStop()
}
func (rs rampingVUsRunState) populateVUHandles(ctx context.Context, cancel func()) {
getVU := func() (lib.InitializedVU, error) {
pvu, err := rs.executor.executionState.GetPlannedVU(rs.executor.logger, false)
if err != nil {
rs.executor.logger.WithError(err).Error("Cannot get a VU from the buffer")
cancel()
return pvu, err
}
currentMaxAllowedVUs = newMaxAllowedVUs
rs.wg.Add(1)
atomic.AddInt64(rs.activeVUsCount, 1)
rs.executor.executionState.ModCurrentlyActiveVUsCount(+1)
return pvu, err
}
returnVU := func(initVU lib.InitializedVU) {
rs.executor.executionState.ReturnVU(initVU, false)
atomic.AddInt64(rs.activeVUsCount, -1)
rs.wg.Done()
rs.executor.executionState.ModCurrentlyActiveVUsCount(-1)
}
for i := uint64(0); i < rs.maxVUs; i++ {
rs.vuHandles[i] = newStoppedVUHandle(
ctx, getVU, returnVU, rs.executor.nextIterationCounters,
&rs.executor.config.BaseConfig, rs.executor.logger.WithField("vuNum", i))
}
}

wait := waiter(parentCtx, startTime)
// iterate over rawExecutionSteps and gracefulExecutionSteps in order by TimeOffset
// giving rawExecutionSteps precedence.
// we stop iterating once rawExecutionSteps are over as we need to run the remaining
// gracefulExecutionSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop timeouts
i, j := 0, 0
for i != len(rawExecutionSteps) {
if rawExecutionSteps[i].TimeOffset > gracefulExecutionSteps[j].TimeOffset {
if wait(gracefulExecutionSteps[j].TimeOffset) {
func (rs rampingVUsRunState) handleVUs(ctx context.Context) {
// iterate over rawSteps and gracefulSteps in order by TimeOffset
// giving rawSteps precedence.
// we stop iterating once rawSteps are over as we need to run the remaining
// gracefulSteps concurrently while waiting for VUs to stop in order to not wait until
// the end of gracefulStop (= maxDur-regDur) timeouts
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
handleNewScheduledVUs = rs.scheduledVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for i, j := 0, 0; i != len(rs.rawSteps); {
r, g := rs.rawSteps[i], rs.gracefulSteps[j]
if g.TimeOffset < r.TimeOffset {
j++
if wait(g.TimeOffset) {
return
}
handleNewMaxAllowedVUs(gracefulExecutionSteps[j].PlannedVUs)
j++
handleNewMaxAllowedVUs(g)
} else {
if wait(rawExecutionSteps[i].TimeOffset) {
i++
if wait(r.TimeOffset) {
return
}
handleNewScheduledVUs(rawExecutionSteps[i].PlannedVUs)
i++
handleNewScheduledVUs(r)
}
}
}

go func() { // iterate over the remaining gracefulExecutionSteps
for _, step := range gracefulExecutionSteps[j:] {
if wait(step.TimeOffset) {
return
}
handleNewMaxAllowedVUs(step.PlannedVUs)
// TODO: removing this has no effect on tests?
func (rs rampingVUsRunState) handleRemainingVUs(ctx context.Context) {
var (
handleNewMaxAllowedVUs = rs.maxAllowedVUsHandlerStrategy()
wait = waiter(ctx, rs.started)
)
for _, s := range rs.gracefulSteps {
if wait(s.TimeOffset) {
return
}
}()
handleNewMaxAllowedVUs(s)
}
}

return nil
func (rs rampingVUsRunState) maxAllowedVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64
return func(graceful lib.ExecutionStep) {
pv := graceful.PlannedVUs
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].hardStop()
}
cur = pv
}
}

func (rs rampingVUsRunState) scheduledVUsHandlerStrategy() func(lib.ExecutionStep) {
var cur uint64
return func(raw lib.ExecutionStep) {
pv := raw.PlannedVUs
for ; cur < pv; cur++ {
_ = rs.vuHandles[cur].start() // TODO: handle the error
}
for ; pv < cur; cur-- {
rs.vuHandles[cur-1].gracefulStop()
}
cur = pv
}
}

// waiter returns a function that will sleep/wait for the required time since the startTime and then
Expand All @@ -651,9 +682,9 @@ func (vlv RampingVUs) Run(
func waiter(ctx context.Context, startTime time.Time) func(offset time.Duration) bool {
timer := time.NewTimer(time.Hour * 24)
return func(offset time.Duration) bool {
offsetDiff := offset - time.Since(startTime)
if offsetDiff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(offsetDiff)
diff := offset - time.Since(startTime)
if diff > 0 { // wait until time of event arrives // TODO have a mininum
timer.Reset(diff)
select {
case <-ctx.Done():
return true // exit if context is cancelled
Expand Down

0 comments on commit cf3e5ce

Please sign in to comment.