Skip to content

Commit

Permalink
Introduce Pacer interface, ConstantPacer and SinePacer. (tsenart#400)
Browse files Browse the repository at this point in the history
* Introduce Pacer interface and ConstantPacer.

The Pacer interface allows vegeta's attack rate to be customized.
Attack code keeps track of the elapsed attack duration and the number
of hits sent and provides these to Pace, which must return a Duration
for the attack code to sleep for before sending the next hit. Pace may
also return true for the second stop value to terminate the attack.

This interface allows vegeta to model a wider range of attack scenarios,
such as diurnal request patterns or load-test behaviour, where the
attack rate increases until the target begins to serve errors.

* Add SinePacer, for sinusoidal attack rates.

SinePacer is a Pacer that describes attack request rates with the
equation Rate = Mean * Amplitude * sin(Offset + t * (2𝛑 / Period)).

Both the Mean and Amplitude rates are expressed with ConstantPacer.
  • Loading branch information
fluffle authored and tsenart committed May 12, 2019
1 parent e827e02 commit d48d090
Show file tree
Hide file tree
Showing 4 changed files with 458 additions and 24 deletions.
2 changes: 1 addition & 1 deletion attack.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type attackOpts struct {
// attack validates the attack arguments, sets up the
// required resources, launches the attack and writes the results
func attack(opts *attackOpts) (err error) {
if opts.rate.IsZero() {
if opts.rate.Per <= 0 || opts.rate.Freq <= 0 {
return errZeroRate
}

Expand Down
38 changes: 15 additions & 23 deletions lib/attack.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,25 +218,14 @@ func Client(c *http.Client) func(*Attacker) {
return func(a *Attacker) { a.client = *c }
}

// A Rate of hits during an Attack.
type Rate struct {
Freq int // Frequency (number of occurrences) per ...
Per time.Duration // Time unit, usually 1s
}

// IsZero returns true if either Freq or Per are zero valued.
func (r Rate) IsZero() bool {
return r.Freq == 0 || r.Per == 0
}

// Attack reads its Targets from the passed Targeter and attacks them at
// the rate specified for the given duration. When the duration is zero the attack
// the rate specified by the Pacer. When the duration is zero the attack
// runs until Stop is called. Results are sent to the returned channel as soon
// as they arrive and will have their Attack field set to the given name.
func (a *Attacker) Attack(tr Targeter, r Rate, du time.Duration, name string) <-chan *Result {
func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <-chan *Result {
var workers sync.WaitGroup
results := make(chan *Result)
ticks := make(chan uint64)
ticks := make(chan struct{})
for i := uint64(0); i < a.workers; i++ {
workers.Add(1)
go a.attack(tr, name, &workers, ticks, results)
Expand All @@ -246,17 +235,20 @@ func (a *Attacker) Attack(tr Targeter, r Rate, du time.Duration, name string) <-
defer close(results)
defer workers.Wait()
defer close(ticks)
interval := uint64(r.Per.Nanoseconds() / int64(r.Freq))
hits := uint64(du) / interval
began, count := time.Now(), uint64(0)
for {
now, next := time.Now(), began.Add(time.Duration(count*interval))
time.Sleep(next.Sub(now))
elapsed := time.Since(began)
if du > 0 && elapsed > du {
return
}
wait, stop := p.Pace(elapsed, count)
if stop {
return
}
time.Sleep(wait)
select {
case ticks <- count:
if count++; count == hits {
return
}
case ticks <- struct{}{}:
count++
case <-a.stopch:
return
default: // all workers are blocked. start one more and try again
Expand All @@ -279,7 +271,7 @@ func (a *Attacker) Stop() {
}
}

func (a *Attacker) attack(tr Targeter, name string, workers *sync.WaitGroup, ticks <-chan uint64, results chan<- *Result) {
func (a *Attacker) attack(tr Targeter, name string, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) {
defer workers.Done()
for range ticks {
results <- a.hit(tr, name)
Expand Down
216 changes: 216 additions & 0 deletions lib/pacer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package vegeta

import (
"fmt"
"math"
"time"
)

// A Pacer defines the rate of hits during an Attack by
// returning the duration an Attacker should wait until
// hitting the next Target. If the second return value
// is true, the attack will terminate.
type Pacer interface {
Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool)
}

// A PacerFunc is a function adapter type that implements
// the Pacer interface.
type PacerFunc func(time.Duration, uint64) (time.Duration, bool)

// Pace implements the Pacer interface.
func (pf PacerFunc) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {
return pf(elapsed, hits)
}

// A ConstantPacer defines a constant rate of hits for the target.
type ConstantPacer struct {
Freq int // Frequency (number of occurrences) per ...
Per time.Duration // Time unit, usually 1s
}

// Rate is a type alias for ConstantPacer for backwards-compatibility.
type Rate = ConstantPacer

// ConstantPacer satisfies the Pacer interface.
var _ Pacer = ConstantPacer{}

// String returns a pretty-printed description of the ConstantPacer's behaviour:
// ConstantPacer{Freq: 1, Per: time.Second} => Constant{1 hits/1s}
func (cp ConstantPacer) String() string {
return fmt.Sprintf("Constant{%d hits/%s}", cp.Freq, cp.Per)
}

// Pace determines the length of time to sleep until the next hit is sent.
func (cp ConstantPacer) Pace(elapsed time.Duration, hits uint64) (time.Duration, bool) {
if cp.Per <= 0 || cp.Freq <= 0 {
// If pacer configuration is invalid, stop the attack.
return 0, true
}
expectedHits := uint64(cp.Freq) * uint64(elapsed/cp.Per)
if hits < expectedHits {
// Running behind, send next hit immediately.
return 0, false
}
interval := uint64(cp.Per.Nanoseconds() / int64(cp.Freq))
if math.MaxInt64/interval < hits {
// We would overflow delta if we continued, so stop the attack.
return 0, true
}
delta := time.Duration((hits + 1) * interval)
// Zero or negative durations cause time.Sleep to return immediately.
return delta - elapsed, false
}

// hitsPerNs returns the attack rate this ConstantPacer represents, in
// fractional hits per nanosecond.
func (cp ConstantPacer) hitsPerNs() float64 {
return float64(cp.Freq) / float64(cp.Per)
}

const (
// MeanUp is a SinePacer Offset that causes the attack to start
// at the Mean attack rate and increase towards the peak.
MeanUp float64 = 0
// Peak is a SinePacer Offset that causes the attack to start
// at the peak (maximum) attack rate and decrease towards the Mean.
Peak = math.Pi / 2
// MeanDown is a SinePacer Offset that causes the attack to start
// at the Mean attack rate and decrease towards the trough.
MeanDown = math.Pi
// Trough is a SinePacer Offset that causes the attack to start
// at the trough (minimum) attack rate and increase towards the Mean.
Trough = 3 * math.Pi / 2
)

// SinePacer is a Pacer that describes attack request rates with the equation:
// R = MA sin(O+(2𝛑/P)t)
// Where:
// R = Instantaneous attack rate at elapsed time t, hits per nanosecond
// M = Mean attack rate over period P, sp.Mean, hits per nanosecond
// A = Amplitude of sine wave, sp.Amp, hits per nanosecond
// O = Offset of sine wave, sp.StartAt, radians
// P = Period of sine wave, sp.Period, nanoseconds
// t = Elapsed time since attack start, nanoseconds
//
// Many thanks to http://ascii.co.uk/art/sine and "sps" for the ascii here :-)
//
// Mean -| ,-'''-.
// +Amp | ,-' | `-.
// | ,' | `. O=𝛑
// | ,' O=𝛑/2 `. MeanDown
// | / Peak \ /
// |/ \ /
// Mean -+-------------------------\--------------------------> t
// |\ \ /
// | \ \ O=3𝛑/2 /
// | O=0 `. Trough ,'
// | MeanUp `. | ,'
// Mean | `-. | ,-'
// -Amp -| `-,,,-'
// |<-------------------- Period --------------------->|
//
// This equation is integrated with respect to time to derive the expected
// number of hits served at time t after the attack began:
// H = Mt - (AP/2𝛑)cos(O+(2𝛑/P)t) + (AP/2𝛑)cos(O)
// Where:
// H = Total number of hits triggered during t
type SinePacer struct {
// The period of the sine wave, e.g. 20*time.Minute
// MUST BE > 0
Period time.Duration
// The mid-point of the sine wave in freq-per-Duration,
// MUST BE > 0
Mean Rate
// The amplitude of the sine wave in freq-per-Duration,
// MUST NOT BE EQUAL TO OR LARGER THAN MEAN
Amp Rate
// The offset, in radians, for the sine wave at t=0.
StartAt float64
}

// SinePacer satisfies the Pacer interface.
var _ Pacer = SinePacer{}

// String returns a pretty-printed description of the SinePacer's behaviour:
// SinePacer{
// Period: time.Hour,
// Mean: Rate{100, time.Second},
// Amp: Rate{50, time.Second},
// StartAt: MeanDown,
// } =>
// Sine{Constant{100 hits/1s} ± Constant{50 hits/1s} / 1h, offset 1𝛑}
func (sp SinePacer) String() string {
return fmt.Sprintf("Sine{%s ± %s / %s, offset %g𝛑}", sp.Mean, sp.Amp, sp.Period, sp.StartAt/math.Pi)
}

// invalid tests the constraints documented in the SinePacer struct definition.
func (sp SinePacer) invalid() bool {
return sp.Period <= 0 || sp.Mean.hitsPerNs() <= 0 || sp.Amp.hitsPerNs() >= sp.Mean.hitsPerNs()
}

// Pace determines the length of time to sleep until the next hit is sent.
func (sp SinePacer) Pace(elapsedTime time.Duration, elapsedHits uint64) (time.Duration, bool) {
if sp.invalid() {
// If the SinePacer configuration is invalid, stop the attack.
return 0, true
}
expectedHits := sp.hits(elapsedTime)
if elapsedHits < uint64(expectedHits) {
// Running behind, send next hit immediately.
return 0, false
}
// Re-arranging our hits equation to provide a duration given the number of
// requests sent is non-trivial, so we must solve for the duration numerically.
// math.Round() added here because we have to coerce to int64 nanoseconds
// at some point and it corrects a bunch of off-by-one problems.
nsPerHit := math.Round(1 / sp.hitsPerNs(elapsedTime))
hitsToWait := float64(elapsedHits+1) - expectedHits
nextHitIn := time.Duration(nsPerHit * hitsToWait)

// If we can't converge to an error of <1e-3 within 5 iterations, bail.
// This rarely even loops for any large Period if hitsToWait is small.
for i := 0; i < 5; i++ {
hitsAtGuess := sp.hits(elapsedTime + nextHitIn)
err := float64(elapsedHits+1) - hitsAtGuess
if math.Abs(err) < 1e-3 {
return nextHitIn, false
}
nextHitIn = time.Duration(float64(nextHitIn) / (hitsAtGuess - float64(elapsedHits)))
}
return nextHitIn, false
}

// ampHits returns AP/2𝛑, which is the number of hits added or subtracted
// from the Mean due to the Amplitude over a quarter of the Period,
// i.e. from 0 → 𝛑/2 radians
func (sp SinePacer) ampHits() float64 {
return (sp.Amp.hitsPerNs() * float64(sp.Period)) / (2 * math.Pi)
}

// radians converts the elapsed attack time to a radian value.
// The elapsed time t is divided by the wave period, multiplied by 2𝛑 to
// convert to radians, and offset by StartAt radians.
func (sp SinePacer) radians(t time.Duration) float64 {
return sp.StartAt + float64(t)*2*math.Pi/float64(sp.Period)
}

// hitsPerNs calculates the instantaneous rate of attack at
// t nanoseconds after the attack began.
// R = MA sin(O+(2𝛑/P)t)
func (sp SinePacer) hitsPerNs(t time.Duration) float64 {
return sp.Mean.hitsPerNs() + sp.Amp.hitsPerNs()*math.Sin(sp.radians(t))
}

// hits returns the number of hits that have been sent during an attack
// lasting t nanoseconds. It returns a float so we can tell exactly how
// much we've missed our target by when solving numerically in Pace.
// H = Mt - (AP/2𝛑)cos(O+(2𝛑/P)t) + (AP/2𝛑)cos(O)
// This re-arranges to:
// H = Mt + (AP/2𝛑)(cos(O) - cos(O+(2𝛑/P)t))
func (sp SinePacer) hits(t time.Duration) float64 {
if t <= 0 || sp.invalid() {
return 0
}
return sp.Mean.hitsPerNs()*float64(t) + sp.ampHits()*(math.Cos(sp.StartAt)-math.Cos(sp.radians(t)))
}
Loading

0 comments on commit d48d090

Please sign in to comment.