Skip to content

Commit

Permalink
A new autoscaling policy that scales number of containers proportiona…
Browse files Browse the repository at this point in the history
…lly, and can be extended to incorporate load predictions.
  • Loading branch information
EvanKrall committed May 3, 2017
1 parent 12b57e8 commit 4888d31
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 11 deletions.
6 changes: 6 additions & 0 deletions paasta_itests/autoscaling_service.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Feature: service autoscaling works as expected
Scenario: save_historical_load and fetch_historical_load can save and fetch data.
Given a working paasta cluster, with docker registry doesntmatter.lol
Given some fake historical load data
When I save the fake historical load data
Then I should get the same fake historical load data back when I fetch it
30 changes: 30 additions & 0 deletions paasta_itests/steps/autoscaling_service_steps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import absolute_import
from __future__ import unicode_literals

from behave import given
from behave import then
from behave import when

from paasta_tools.autoscaling import autoscaling_service_lib


@given('some fake historical load data')
def make_fake_historical_load_data(context):
context.fake_historical_load_data = [
(1, 10),
(2, 20),
(3, 10),
(4, 1337),
]


@when('I save the fake historical load data')
def save_fake_historical_load_data(context):
autoscaling_service_lib.save_historical_load(context.fake_historical_load_data, '/itest/fake_historical_load_data')


@then('I should get the same fake historical load data back when I fetch it')
def load_fake_historical_load_data(context):
actual = autoscaling_service_lib.fetch_historical_load('/itest/fake_historical_load_data')
expected = context.fake_historical_load_data
assert actual == expected
126 changes: 119 additions & 7 deletions paasta_tools/autoscaling/autoscaling_service_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from __future__ import unicode_literals

import logging
import struct
import time
from collections import namedtuple
from contextlib import contextmanager
from datetime import datetime
Expand Down Expand Up @@ -63,6 +65,7 @@

SERVICE_METRICS_PROVIDER_KEY = 'metrics_provider'
DECISION_POLICY_KEY = 'decision_policy'
FORECAST_POLICY_KEY = 'forecast_policy'

AUTOSCALING_DELAY = 300
MAX_TASK_DELTA = 0.3
Expand Down Expand Up @@ -90,6 +93,14 @@ def get_decision_policy(name):
return _autoscaling_components[DECISION_POLICY_KEY][name]


def get_forecast_policy(name):
"""
Returns a forecast policy matching the given name. Only used by decision policies that try to forecast load, like
the proportional decision policy.
"""
return _autoscaling_components[FORECAST_POLICY_KEY][name]


class MetricsProviderNoDataError(ValueError):
pass

Expand Down Expand Up @@ -168,6 +179,102 @@ def clamp_value(number):
return int(round(clamp_value(Kp * error + iterm + Kd * (error - last_error) / time_delta)))


@register_autoscaling_component('proportional', DECISION_POLICY_KEY)
def proportional_decision_policy(zookeeper_path, current_instances, min_instances, max_instances, setpoint, utilization,
num_healthy_instances, noop=False, offset=0.0, forecast_policy='current', **kwargs):
"""Uses a simple proportional model to decide the correct number of instances to scale to, i.e. if load is 110% of
the setpoint, scales up by 10%. Includes correction for an offset, if your containers have a baseline utilization
independent of the number of containers.
The model is: utilization per container = (total load)/(number of containers) + offset.
total load and offset are measured in the same unit as your metric provider. If you're measuring CPU per container,
offset is the baseline CPU of an idle container, and total load is the total CPU required across all containers,
subtracting the offset for each container.
:param offset: A float (should be between 0.0 and 1.0) representing the expected baseline load for each container.
e.g. if the metric you're using is CPU, then how much CPU an idle container would use.
This should never be more than your setpoint. (If it takes 50% cpu to run an idle container, we can't
get your utilization below 50% no matter how many containers we run.)
:param forecast_policy: The method for forecasting future load values. Currently, only one forecaster exists:
"current", which assumes that the load will remain the same as the current value for the
near future.
"""

forecast_policy_func = get_forecast_policy(forecast_policy)

current_load = (utilization - offset) * num_healthy_instances

historical_load = fetch_historical_load(zk_path_prefix=zookeeper_path)
historical_load.append((time.time(), current_load))
save_historical_load(historical_load, zk_path_prefix=zookeeper_path)

predicted_load = forecast_policy_func(historical_load, **kwargs)

desired_number_instances = int(round(predicted_load / (setpoint - offset)))

if desired_number_instances < min_instances:
desired_number_instances = min_instances
if desired_number_instances > max_instances:
desired_number_instances = max_instances

return desired_number_instances - current_instances # The calling function wants a delta, not an absolute value.


@register_autoscaling_component('current', FORECAST_POLICY_KEY)
def current_value_forecast_policy(historical_load, **kwargs):
"""A prediction policy that assumes that the value any time in the future will be the same as the current value.
:param historical_load: a list of (timestamp, value)s, where timestamp is a unix timestamp and value is load.
"""
return historical_load[-1][1]


HISTORICAL_LOAD_SERIALIZATION_FORMAT = 'dd'
SIZE_PER_HISTORICAL_LOAD_RECORD = struct.calcsize(HISTORICAL_LOAD_SERIALIZATION_FORMAT)


def zk_historical_load_path(zk_path_prefix):
return "%s/historical_load" % zk_path_prefix


def save_historical_load(historical_load, zk_path_prefix):
with ZookeeperPool() as zk:
historical_load_bytes = serialize_historical_load(historical_load)
zk.ensure_path(zk_historical_load_path(zk_path_prefix))
zk.set(zk_historical_load_path(zk_path_prefix), historical_load_bytes)


def serialize_historical_load(historical_load):
max_records = 1000000 // SIZE_PER_HISTORICAL_LOAD_RECORD
historical_load = historical_load[-max_records:]
return b''.join([struct.pack(HISTORICAL_LOAD_SERIALIZATION_FORMAT, *x) for x in historical_load])


def fetch_historical_load(zk_path_prefix):
with ZookeeperPool() as zk:
try:
historical_load_bytes, _ = zk.get(zk_historical_load_path(zk_path_prefix))
return deserialize_historical_load(historical_load_bytes)
except NoNodeError:
return []


def deserialize_historical_load(historical_load_bytes):
historical_load = []

for pos in xrange(0, len(historical_load_bytes), SIZE_PER_HISTORICAL_LOAD_RECORD):
historical_load.append(
struct.unpack(
# unfortunately struct.unpack doesn't like kwargs.
HISTORICAL_LOAD_SERIALIZATION_FORMAT,
historical_load_bytes[pos:pos + SIZE_PER_HISTORICAL_LOAD_RECORD],
)
)

return historical_load


def get_json_body_from_service(host, port, endpoint, timeout=2):
return requests.get(
'http://%s:%s/%s' % (host, port, endpoint),
Expand Down Expand Up @@ -387,13 +494,14 @@ def get_autoscaling_info(marathon_client, service, instance, cluster, soa_dir):
marathon_tasks=list(marathon_tasks.values()),
mesos_tasks=mesos_tasks)
error = get_error_from_utilization(utilization=utilization,
setpoint=autoscaling_params.pop('setpoint'),
setpoint=autoscaling_params['setpoint'],
current_instances=service_config.get_instances())
new_instance_count = get_new_instance_count(utilization=utilization,
error=error,
autoscaling_params=autoscaling_params,
current_instances=service_config.get_instances(),
marathon_service_config=service_config)
marathon_service_config=service_config,
num_healthy_instances=len(marathon_tasks))
current_utilization = "{:.1f}%".format(utilization * 100)
except MetricsProviderNoDataError:
current_utilization = "Exception"
Expand All @@ -406,19 +514,22 @@ def get_autoscaling_info(marathon_client, service, instance, cluster, soa_dir):
return None


def get_new_instance_count(utilization, error, autoscaling_params, current_instances, marathon_service_config):
autoscaling_decision_policy = get_decision_policy(autoscaling_params.pop(DECISION_POLICY_KEY))
def get_new_instance_count(utilization, error, autoscaling_params, current_instances, marathon_service_config,
num_healthy_instances):
autoscaling_decision_policy = get_decision_policy(autoscaling_params[DECISION_POLICY_KEY])

zookeeper_path = compose_autoscaling_zookeeper_root(
service=marathon_service_config.service,
instance=marathon_service_config.instance,
)
autoscaling_amount = autoscaling_decision_policy(
utilization=utilization,
error=error,
min_instances=marathon_service_config.get_min_instances(),
max_instances=marathon_service_config.get_max_instances(),
current_instances=current_instances,
zookeeper_path=zookeeper_path,
num_healthy_instances=num_healthy_instances,
**autoscaling_params
)

Expand All @@ -432,7 +543,7 @@ def get_new_instance_count(utilization, error, autoscaling_params, current_insta


def get_utilization(marathon_service_config, autoscaling_params, log_utilization_data, marathon_tasks, mesos_tasks):
autoscaling_metrics_provider = get_service_metrics_provider(autoscaling_params.pop(SERVICE_METRICS_PROVIDER_KEY))
autoscaling_metrics_provider = get_service_metrics_provider(autoscaling_params[SERVICE_METRICS_PROVIDER_KEY])

return autoscaling_metrics_provider(
marathon_service_config=marathon_service_config,
Expand Down Expand Up @@ -463,15 +574,16 @@ def autoscale_marathon_instance(marathon_service_config, marathon_tasks, mesos_t
)
error = get_error_from_utilization(
utilization=utilization,
setpoint=autoscaling_params.pop('setpoint'),
setpoint=autoscaling_params['setpoint'],
current_instances=current_instances,
)
new_instance_count = get_new_instance_count(
utilization=utilization,
error=error,
autoscaling_params=autoscaling_params,
current_instances=current_instances,
marathon_service_config=marathon_service_config
marathon_service_config=marathon_service_config,
num_healthy_instances=len(marathon_tasks),
)

safe_downscaling_threshold = int(current_instances * 0.7)
Expand Down
Loading

0 comments on commit 4888d31

Please sign in to comment.