Skip to content

Commit

Permalink
Implement a forecaster that uses linear regression over a trailing wi…
Browse files Browse the repository at this point in the history
…ndow to estimate slope and level of the load.
  • Loading branch information
EvanKrall committed May 10, 2017
1 parent 5c50327 commit a4dda95
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 9 deletions.
68 changes: 59 additions & 9 deletions paasta_tools/autoscaling/autoscaling_service_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,22 +243,72 @@ def current_value_forecast_policy(historical_load, **kwargs):
return historical_load[-1][1]


def window_historical_load(historical_load, window_begin, window_end):
"""Filter historical_load down to just the datapoints lying between times window_begin and window_end, inclusive."""
filtered = []
for timestamp, value in historical_load:
if timestamp >= window_begin and timestamp <= window_end:
filtered.append((timestamp, value))
return filtered


def trailing_window_historical_load(historical_load, window_size):
window_end, _ = historical_load[-1]
window_begin = window_end - window_size
return window_historical_load(historical_load, window_begin, window_end)


@register_autoscaling_component('moving_average', FORECAST_POLICY_KEY)
def moving_average_forecast_policy(historical_load, moving_average_window_seconds, **kwargs):
"""Does a simple average of all historical load data points within the moving average window. Weights all data
points within the window equally."""

window_end, _ = historical_load[-1]
window_begin = window_end - moving_average_window_seconds
windowed_data = trailing_window_historical_load(historical_load, moving_average_window_seconds)
windowed_values = [value for timestamp, value in windowed_data]
return sum(windowed_values) / len(windowed_values)

count = 0
total = 0
for timestamp, value in historical_load:
if timestamp >= window_begin and timestamp <= window_end:
count += 1
total += value

return total / count
@register_autoscaling_component('linreg', FORECAST_POLICY_KEY)
def linreg_forecast_policy(historical_load, linreg_window_seconds, linreg_extrapolation_seconds,
linreg_default_slope=0, **kwargs):
"""Does a linear regression on the load data within the last linreg_window_seconds. For every time delta in
linreg_extrapolation_seconds, forecasts the value at that time delta from now, and returns the maximum of these
predicted values. (With linear extrapolation, it doesn't make sense to forecast at more than two points, as the max
load will always be at the first or last time delta.)
:param linreg_window_seconds: Consider all data from this many seconds ago until now.
:param linreg_extrapolation_seconds: A list of floats representing a number of seconds in the future at which to
predict the load. The highest prediction will be returned.
:param linreg_default_slope: If there is only one data point within the window, the equation for slope is undefined,
so we use this value (expressed in load/second) for prediction instead. Default is
0.
"""

window = trailing_window_historical_load(historical_load, linreg_window_seconds)

loads = [load for timestamp, load in window]
times = [timestamp for timestamp, load in window]

mean_time = sum(times) / len(times)
mean_load = sum(loads) / len(loads)

if len(window) > 1:
slope = sum((t - mean_time) * (l - mean_load) for t, l in window) / sum((t - mean_time) ** 2 for t in times)
else:
slope = linreg_default_slope

intercept = mean_load - slope * mean_time

def predict(timestamp):
return slope * timestamp + intercept

if isinstance(linreg_extrapolation_seconds, (int, float)):
linreg_extrapolation_seconds = [linreg_extrapolation_seconds]

now, _ = historical_load[-1]
forecasted_values = [predict(now + delta) for delta in linreg_extrapolation_seconds]
return max(forecasted_values)


HISTORICAL_LOAD_SERIALIZATION_FORMAT = 'dd'
Expand Down
63 changes: 63 additions & 0 deletions tests/autoscaling/test_autoscaling_service_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,3 +1401,66 @@ def test_moving_average_forecast_policy():
moving_average_window_seconds=5)
assert 220 == autoscaling_service_lib.moving_average_forecast_policy(historical_load,
moving_average_window_seconds=0.5)


def test_linreg_forecast_policy():
historical_load = [
(1, 100),
(2, 120),
(3, 140),
(4, 160),
(5, 180),
(6, 200),
(7, 220),
]

assert 220 == autoscaling_service_lib.linreg_forecast_policy(
historical_load,
linreg_window_seconds=7,
linreg_extrapolation_seconds=0,
)
assert 1000 == autoscaling_service_lib.linreg_forecast_policy(
historical_load,
linreg_window_seconds=7,
linreg_extrapolation_seconds=39,
)

# We should handle the case where there's only 1 data point within the window.
assert 220 == autoscaling_service_lib.linreg_forecast_policy(
historical_load,
linreg_window_seconds=0,
linreg_extrapolation_seconds=0,
)
assert 220 == autoscaling_service_lib.linreg_forecast_policy(
historical_load,
linreg_window_seconds=0,
linreg_extrapolation_seconds=10,
)
assert 1000 == autoscaling_service_lib.linreg_forecast_policy(
historical_load,
linreg_window_seconds=0,
linreg_extrapolation_seconds=78,
linreg_default_slope=10,
)

historical_load_2 = [
(1, 100),
(2, 100),
(3, 100),
(4, 100),
(5, 100),
(6, 100),

(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500),
(6, 600),
]

assert 350 == autoscaling_service_lib.linreg_forecast_policy(
historical_load_2,
linreg_window_seconds=7,
linreg_extrapolation_seconds=0,
)

0 comments on commit a4dda95

Please sign in to comment.