Skip to content

Commit

Permalink
Remove autoscale force_scale methods (celery#6085)
Browse files Browse the repository at this point in the history
* Remove autoscale force_scale methods

* Remove unused variable in test
  • Loading branch information
nadflinn authored Jun 13, 2020
1 parent bf6139b commit 0715118
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 47 deletions.
15 changes: 0 additions & 15 deletions celery/worker/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,6 @@ def update(self, max=None, min=None):
self.min_concurrency = min
return self.max_concurrency, self.min_concurrency

def force_scale_up(self, n):
with self.mutex:
new = self.processes + n
if new > self.max_concurrency:
self._update_consumer_prefetch_count(new)
self.max_concurrency = new
self._grow(n)

def force_scale_down(self, n):
with self.mutex:
new = self.processes - n
if new < self.min_concurrency:
self.min_concurrency = max(new, 0)
self._shrink(min(n, self.processes))

def scale_up(self, n):
self._last_scale_up = monotonic()
return self._grow(n)
Expand Down
4 changes: 2 additions & 2 deletions celery/worker/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def memdump(state, samples=10, **kwargs): # pragma: no cover
def pool_grow(state, n=1, **kwargs):
"""Grow pool by n processes/threads."""
if state.consumer.controller.autoscaler:
state.consumer.controller.autoscaler.force_scale_up(n)
return nok("pool_grow is not supported with autoscale. Adjust autoscale range instead.")
else:
state.consumer.pool.grow(n)
state.consumer._update_prefetch_count(n)
Expand All @@ -483,7 +483,7 @@ def pool_grow(state, n=1, **kwargs):
def pool_shrink(state, n=1, **kwargs):
"""Shrink pool by n processes/threads."""
if state.consumer.controller.autoscaler:
state.consumer.controller.autoscaler.force_scale_down(n)
return nok("pool_shrink is not supported with autoscale. Adjust autoscale range instead.")
else:
state.consumer.pool.shrink(n)
state.consumer._update_prefetch_count(-n)
Expand Down
35 changes: 10 additions & 25 deletions t/unit/worker/test_autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,20 @@ def test_shrink_raises_ValueError(self, debug):
x.scale_down(1)
assert debug.call_count

def test_update_and_force(self):
def test_update(self):
worker = Mock(name='worker')
x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
x.worker.consumer.prefetch_multiplier = 1
x.keepalive = 0
assert x.processes == 3
x.force_scale_up(5)
assert x.processes == 8
x.update(5, None)
assert x.processes == 5
x.force_scale_down(3)
assert x.processes == 2
x.update(None, 3)
assert x.processes == 3
x.force_scale_down(1000)
assert x.min_concurrency == 0
assert x.processes == 0
x.force_scale_up(1000)
x.min_concurrency = 1
x.force_scale_down(1)
x.scale_up(5)
x.update(7, None)
assert x.processes == 7
assert x.max_concurrency == 7
x.scale_down(4)
x.update(None, 6)
assert x.processes == 6
assert x.min_concurrency == 6

x.update(max=300, min=10)
x.update(max=300, min=2)
Expand All @@ -192,16 +187,6 @@ def test_prefetch_count_on_updates_prefetch_multiplier_gt_one(self):
x.update(15, 7)
worker.consumer._update_prefetch_count.assert_called_with(10)

def test_prefetch_count_on_force_up(self):
worker = Mock(name='worker')
x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
x.worker.consumer.prefetch_multiplier = 1

x.force_scale_up(5)
worker.consumer._update_prefetch_count.assert_not_called()
x.force_scale_up(5)
worker.consumer._update_prefetch_count.assert_called_with(3)

def test_info(self):
worker = Mock(name='worker')
x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
Expand Down
9 changes: 4 additions & 5 deletions t/unit/worker/test_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,10 @@ def num_processes(self):

panel.state.consumer = Mock()
panel.state.consumer.controller = Mock()
sc = panel.state.consumer.controller.autoscaler = Mock()
panel.handle('pool_grow')
sc.force_scale_up.assert_called()
panel.handle('pool_shrink')
sc.force_scale_down.assert_called()
r = panel.handle('pool_grow')
assert 'error' in r
r = panel.handle('pool_shrink')
assert 'error' in r

def test_add__cancel_consumer(self):

Expand Down

0 comments on commit 0715118

Please sign in to comment.