Skip to content

Commit

Permalink
stop netem by signal
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexei Ledenev committed Aug 5, 2016
1 parent 3385e0e commit 4f09a22
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 57 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## [v0.2.2] - 2016-08-04
## Changed
- `--interval` flag is optional now; if missing Pumba will do single chaos action and exit


## [v0.2.0] - 2016-07-20
### Added
- Network emulation for egress container traffic, powered by [netem](http://www.linuxfoundation.org/collaborate/workgroups/networking/netem)
Expand Down
60 changes: 46 additions & 14 deletions action/chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type CommandNetemDelay struct {
Amount int
Variation int
Correlation int
StopChan <-chan bool
}

// CommandStop arguments for stop command
Expand All @@ -67,8 +68,14 @@ type Chaos interface {
PauseContainers(container.Client, []string, string, interface{}) error
}

// Pumba makes Chaos
type Pumba struct{}
// NewChaos create new Pumba Choas instance
func NewChaos() Chaos {
return pumbaChaos{}
}

// pumba makes Chaos
type pumbaChaos struct {
}

// all containers beside Pumba and PumbaSkip
func allContainersFilter(c container.Container) bool {
Expand Down Expand Up @@ -228,30 +235,55 @@ func pauseContainers(client container.Client, containers []container.Container,
return nil
}

func netemContainers(client container.Client, containers []container.Container, netInterface string, netemCmd string, ip net.IP, duration time.Duration) error {
func netemContainers(client container.Client, containers []container.Container, netInterface string, netemCmd string, ip net.IP, duration time.Duration, stopChan <-chan bool) error {
var err error
netemContainers := []container.Container{}
if RandomMode {
container := randomContainer(containers)
if container != nil {
err := client.NetemContainer(*container, netInterface, netemCmd, ip, duration, DryMode)
err = client.NetemContainer(*container, netInterface, netemCmd, ip, duration, DryMode)
if err != nil {
return err
}
netemContainers = append(netemContainers, *container)
}
} else {
for _, container := range containers {
err := client.NetemContainer(container, netInterface, netemCmd, ip, duration, DryMode)
err = client.NetemContainer(container, netInterface, netemCmd, ip, duration, DryMode)
if err != nil {
return err
break
} else {
netemContainers = append(netemContainers, container)
}
}
}
return nil
// wait for specified duration and then stop netem (where it applied) or stop on signal to stopChan channel
select {
case <-stopChan:
log.Debugf("Stopping netem by stop event")
err = stopNetemContainers(client, netemContainers, netInterface)
case <-time.After(duration):
log.Debugf("Stopping netem after: %s", duration)
err = stopNetemContainers(client, netemContainers, netInterface)
}

return err
}

func stopNetemContainers(client container.Client, containers []container.Container, netInterface string) error {
var err error
for _, container := range containers {
if e := client.StopNetemContainer(container, netInterface, DryMode); e != nil {
err = e
}
}
return err // last non nil error
}

//---------------------------------------------------------------------------------------------------

// StopContainers stop containers matching pattern
func (p Pumba) StopContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
func (p pumbaChaos) StopContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
log.Info("Stop containers")
// get command details
command, ok := cmd.(CommandStop)
Expand All @@ -267,7 +299,7 @@ func (p Pumba) StopContainers(client container.Client, names []string, pattern s
}

// KillContainers - kill containers either by RE2 pattern (if specified) or by names
func (p Pumba) KillContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
func (p pumbaChaos) KillContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
log.Info("Kill containers")
// get command details
command, ok := cmd.(CommandKill)
Expand All @@ -283,7 +315,7 @@ func (p Pumba) KillContainers(client container.Client, names []string, pattern s
}

// RemoveContainers - remove container either by RE2 pattern (if specified) or by names
func (p Pumba) RemoveContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
func (p pumbaChaos) RemoveContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
log.Info("Remove containers")
// get command details
command, ok := cmd.(CommandRemove)
Expand All @@ -299,8 +331,8 @@ func (p Pumba) RemoveContainers(client container.Client, names []string, pattern
}

// NetemDelayContainers delay network traffic with optional variation and correlation
func (p Pumba) NetemDelayContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
log.Info("netem dealy for containers")
func (p pumbaChaos) NetemDelayContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
log.Info("netem: dealy for containers")
// get command details
command, ok := cmd.(CommandNetemDelay)
if !ok {
Expand All @@ -319,11 +351,11 @@ func (p Pumba) NetemDelayContainers(client container.Client, names []string, pat
netemCmd += " " + strconv.Itoa(command.Correlation) + "%"
}

return netemContainers(client, containers, command.NetInterface, netemCmd, command.IP, command.Duration)
return netemContainers(client, containers, command.NetInterface, netemCmd, command.IP, command.Duration, command.StopChan)
}

// PauseContainers pause container,if its name within `names`, for specified interval
func (p Pumba) PauseContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
func (p pumbaChaos) PauseContainers(client container.Client, names []string, pattern string, cmd interface{}) error {
log.Infof("Pause containers")
// get command details
command, ok := cmd.(CommandPause)
Expand Down
65 changes: 45 additions & 20 deletions action/chaos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ func TestStopByName(t *testing.T) {
client.On("StopContainer", c, 10).Return(nil)
}
// doc action
err := Pumba{}.StopContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.StopContainers(client, names, "", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -203,7 +204,8 @@ func TestStopByNameRandom(t *testing.T) {
client.On("StopContainer", mock.AnythingOfType("container.Container"), 10).Return(nil)
// do action
RandomMode = true
err := Pumba{}.StopContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.StopContainers(client, names, "", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -220,7 +222,8 @@ func TestStopByPattern(t *testing.T) {
client.On("StopContainer", c, 10).Return(nil)
}
// do action
err := Pumba{}.StopContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.StopContainers(client, []string{}, "^c", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -235,7 +238,8 @@ func TestStopByPatternRandom(t *testing.T) {
client.On("StopContainer", mock.AnythingOfType("container.Container"), 10).Return(nil)
// do action
RandomMode = true
err := Pumba{}.StopContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.StopContainers(client, []string{}, "^c", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -252,7 +256,8 @@ func TestKillByName(t *testing.T) {
client.On("KillContainer", c, "SIGTEST").Return(nil)
}
// do action
err := Pumba{}.KillContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.KillContainers(client, names, "", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -267,7 +272,8 @@ func TestKillByNameRandom(t *testing.T) {
client.On("KillContainer", mock.AnythingOfType("container.Container"), "SIGTEST").Return(nil)
// do action
RandomMode = true
err := Pumba{}.KillContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.KillContainers(client, names, "", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -284,7 +290,8 @@ func TestKillByPattern(t *testing.T) {
client.On("KillContainer", cs[i], "SIGTEST").Return(nil)
}
// do action
err := Pumba{}.KillContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.KillContainers(client, []string{}, "^c", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -299,7 +306,8 @@ func TestKillByPatternRandom(t *testing.T) {
client.On("KillContainer", mock.AnythingOfType("container.Container"), "SIGTEST").Return(nil)
// do action
RandomMode = true
err := Pumba{}.KillContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.KillContainers(client, []string{}, "^c", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -314,7 +322,8 @@ func TestRemoveByName(t *testing.T) {
for _, c := range cs {
client.On("RemoveContainer", c, false, false, false).Return(nil)
}
err := Pumba{}.RemoveContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.RemoveContainers(client, names, "", cmd)
assert.NoError(t, err)
client.AssertExpectations(t)
}
Expand All @@ -328,7 +337,8 @@ func TestRemoveByNameRandom(t *testing.T) {
client.On("RemoveContainer", mock.AnythingOfType("container.Container"), false, true, true).Return(nil)
// do action
RandomMode = true
err := Pumba{}.RemoveContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.RemoveContainers(client, names, "", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -345,7 +355,8 @@ func TestRemoveByPattern(t *testing.T) {
client.On("RemoveContainer", c, false, true, true).Return(nil)
}
// do action
err := Pumba{}.RemoveContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.RemoveContainers(client, []string{}, "^c", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -360,7 +371,8 @@ func TestRemoveByPatternRandom(t *testing.T) {
client.On("RemoveContainer", mock.AnythingOfType("container.Container"), false, true, true).Return(nil)
// do action
RandomMode = true
err := Pumba{}.RemoveContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.RemoveContainers(client, []string{}, "^c", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -377,7 +389,8 @@ func TestPauseByName(t *testing.T) {
client.On("PauseContainer", c, 2*time.Millisecond).Return(nil)
}
// do action
err := Pumba{}.PauseContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.PauseContainers(client, names, "", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -393,7 +406,8 @@ func TestPauseByPattern(t *testing.T) {
client.On("PauseContainer", c, 2*time.Millisecond).Return(nil)
}
// do action
err := Pumba{}.PauseContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.PauseContainers(client, []string{}, "^c", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -408,7 +422,8 @@ func TestPauseByNameRandom(t *testing.T) {
client.On("PauseContainer", mock.AnythingOfType("container.Container"), 2*time.Millisecond).Return(nil)
// do action
RandomMode = true
err := Pumba{}.PauseContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.PauseContainers(client, names, "", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -430,9 +445,11 @@ func TestNetemDealyByName(t *testing.T) {
client.On("ListContainers", mock.AnythingOfType("container.Filter")).Return(cs, nil)
for _, c := range cs {
client.On("NetemContainer", c, "eth1", "delay 120ms 25ms 15%", net.ParseIP(""), 1*time.Second).Return(nil)
client.On("StopNetemContainer", c, "eth1").Return(nil)
}
// do action
err := Pumba{}.NetemDelayContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.NetemDelayContainers(client, names, "", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -452,9 +469,11 @@ func TestNetemDealyByNameRandom(t *testing.T) {
client := container.NewMockSamalbaClient()
client.On("ListContainers", mock.AnythingOfType("container.Filter")).Return(cs, nil)
client.On("NetemContainer", mock.AnythingOfType("container.Container"), "eth1", "delay 120ms 25ms 15%", net.ParseIP(""), 1*time.Second).Return(nil)
client.On("StopNetemContainer", mock.AnythingOfType("container.Container"), "eth1").Return(nil)
// do action
RandomMode = true
err := Pumba{}.NetemDelayContainers(client, names, "", cmd)
pumba := pumbaChaos{}
err := pumba.NetemDelayContainers(client, names, "", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand All @@ -476,9 +495,11 @@ func TestNetemDealyByPattern(t *testing.T) {
client.On("ListContainers", mock.AnythingOfType("container.Filter")).Return(cs, nil)
for _, c := range cs {
client.On("NetemContainer", c, "eth1", "delay 120ms 25ms 15%", net.ParseIP(""), 1*time.Second).Return(nil)
client.On("StopNetemContainer", c, "eth1").Return(nil)
}
// do action
err := Pumba{}.NetemDelayContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.NetemDelayContainers(client, []string{}, "^c", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -500,9 +521,11 @@ func TestNetemDealyByPatternIPFilter(t *testing.T) {
client.On("ListContainers", mock.AnythingOfType("container.Filter")).Return(cs, nil)
for _, c := range cs {
client.On("NetemContainer", c, "eth1", "delay 120ms 25ms 15%", ip, 1*time.Second).Return(nil)
client.On("StopNetemContainer", c, "eth1").Return(nil)
}
// do action
err := Pumba{}.NetemDelayContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.NetemDelayContainers(client, []string{}, "^c", cmd)
// asserts
assert.NoError(t, err)
client.AssertExpectations(t)
Expand All @@ -522,9 +545,11 @@ func TestNetemDealyByPatternRandom(t *testing.T) {
client := container.NewMockSamalbaClient()
client.On("ListContainers", mock.AnythingOfType("container.Filter")).Return(cs, nil)
client.On("NetemContainer", mock.AnythingOfType("container.Container"), "eth1", "delay 120ms 25ms 15%", net.ParseIP(""), 1*time.Second).Return(nil)
client.On("StopNetemContainer", mock.AnythingOfType("container.Container"), "eth1").Return(nil)
// do action
RandomMode = true
err := Pumba{}.NetemDelayContainers(client, []string{}, "^c", cmd)
pumba := pumbaChaos{}
err := pumba.NetemDelayContainers(client, []string{}, "^c", cmd)
RandomMode = false
// asserts
assert.NoError(t, err)
Expand Down
14 changes: 9 additions & 5 deletions container/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Client interface {
RemoveImage(Container, bool, bool) error
RemoveContainer(Container, bool, bool, bool, bool) error
NetemContainer(Container, string, string, net.IP, time.Duration, bool) error
StopNetemContainer(Container, string, bool) error
PauseContainer(Container, time.Duration, bool) error
}

Expand Down Expand Up @@ -206,11 +207,14 @@ func (client dockerClient) NetemContainer(c Container, netInterface string, nete
log.Infof("%sRunning netem command '%s' on container %s with filter %s for %s", prefix, netemCmd, c.ID(), targetIP.String(), duration)
err = client.startNetemContainerIPFilter(c, netInterface, netemCmd, targetIP.String(), dryrun)
}
if err != nil {
return err
return err
}

func (client dockerClient) StopNetemContainer(c Container, netInterface string, dryrun bool) error {
prefix := ""
if dryrun {
prefix = dryRunPrefix
}
// sleep (current goroutine) for specified duration and then stop netem
time.Sleep(duration)
log.Infof("%sStopping netem on container %s", prefix, c.ID())
return client.stopNetemContainer(c, netInterface, dryrun)
}
Expand All @@ -226,7 +230,7 @@ func (client dockerClient) PauseContainer(c Container, duration time.Duration, d
return err
}
log.Debugf("Container %s paused for %s", c.ID(), duration)
// pause the current goroutine for specified duration
// TODO: FIXME: pause the current goroutine for specified duration
time.Sleep(duration)
if err := client.api.UnpauseContainer(c.ID()); err != nil {
return err
Expand Down
Loading

0 comments on commit 4f09a22

Please sign in to comment.