diff --git a/paasta_tools/autoscaling/autoscaling_service_lib.py b/paasta_tools/autoscaling/autoscaling_service_lib.py index 6a4aa97947..0a28d6574e 100644 --- a/paasta_tools/autoscaling/autoscaling_service_lib.py +++ b/paasta_tools/autoscaling/autoscaling_service_lib.py @@ -243,22 +243,72 @@ def current_value_forecast_policy(historical_load, **kwargs): return historical_load[-1][1] +def window_historical_load(historical_load, window_begin, window_end): + """Filter historical_load down to just the datapoints lying between times window_begin and window_end, inclusive.""" + filtered = [] + for timestamp, value in historical_load: + if timestamp >= window_begin and timestamp <= window_end: + filtered.append((timestamp, value)) + return filtered + + +def trailing_window_historical_load(historical_load, window_size): + window_end, _ = historical_load[-1] + window_begin = window_end - window_size + return window_historical_load(historical_load, window_begin, window_end) + + @register_autoscaling_component('moving_average', FORECAST_POLICY_KEY) def moving_average_forecast_policy(historical_load, moving_average_window_seconds, **kwargs): """Does a simple average of all historical load data points within the moving average window. Weights all data points within the window equally.""" - window_end, _ = historical_load[-1] - window_begin = window_end - moving_average_window_seconds + windowed_data = trailing_window_historical_load(historical_load, moving_average_window_seconds) + windowed_values = [value for timestamp, value in windowed_data] + return sum(windowed_values) / len(windowed_values) - count = 0 - total = 0 - for timestamp, value in historical_load: - if timestamp >= window_begin and timestamp <= window_end: - count += 1 - total += value - return total / count +@register_autoscaling_component('linreg', FORECAST_POLICY_KEY) +def linreg_forecast_policy(historical_load, linreg_window_seconds, linreg_extrapolation_seconds, + linreg_default_slope=0, **kwargs): + """Does a linear regression on the load data within the last linreg_window_seconds. For every time delta in + linreg_extrapolation_seconds, forecasts the value at that time delta from now, and returns the maximum of these + predicted values. (With linear extrapolation, it doesn't make sense to forecast at more than two points, as the max + load will always be at the first or last time delta.) + + :param linreg_window_seconds: Consider all data from this many seconds ago until now. + :param linreg_extrapolation_seconds: A list of floats representing a number of seconds in the future at which to + predict the load. The highest prediction will be returned. + :param linreg_default_slope: If there is only one data point within the window, the equation for slope is undefined, + so we use this value (expressed in load/second) for prediction instead. Default is + 0. + + """ + + window = trailing_window_historical_load(historical_load, linreg_window_seconds) + + loads = [load for timestamp, load in window] + times = [timestamp for timestamp, load in window] + + mean_time = sum(times) / len(times) + mean_load = sum(loads) / len(loads) + + if len(window) > 1: + slope = sum((t - mean_time) * (l - mean_load) for t, l in window) / sum((t - mean_time) ** 2 for t in times) + else: + slope = linreg_default_slope + + intercept = mean_load - slope * mean_time + + def predict(timestamp): + return slope * timestamp + intercept + + if isinstance(linreg_extrapolation_seconds, (int, float)): + linreg_extrapolation_seconds = [linreg_extrapolation_seconds] + + now, _ = historical_load[-1] + forecasted_values = [predict(now + delta) for delta in linreg_extrapolation_seconds] + return max(forecasted_values) HISTORICAL_LOAD_SERIALIZATION_FORMAT = 'dd' diff --git a/tests/autoscaling/test_autoscaling_service_lib.py b/tests/autoscaling/test_autoscaling_service_lib.py index a05237a6ba..cbd525001f 100644 --- a/tests/autoscaling/test_autoscaling_service_lib.py +++ b/tests/autoscaling/test_autoscaling_service_lib.py @@ -1401,3 +1401,66 @@ def test_moving_average_forecast_policy(): moving_average_window_seconds=5) assert 220 == autoscaling_service_lib.moving_average_forecast_policy(historical_load, moving_average_window_seconds=0.5) + + +def test_linreg_forecast_policy(): + historical_load = [ + (1, 100), + (2, 120), + (3, 140), + (4, 160), + (5, 180), + (6, 200), + (7, 220), + ] + + assert 220 == autoscaling_service_lib.linreg_forecast_policy( + historical_load, + linreg_window_seconds=7, + linreg_extrapolation_seconds=0, + ) + assert 1000 == autoscaling_service_lib.linreg_forecast_policy( + historical_load, + linreg_window_seconds=7, + linreg_extrapolation_seconds=39, + ) + + # We should handle the case where there's only 1 data point within the window. + assert 220 == autoscaling_service_lib.linreg_forecast_policy( + historical_load, + linreg_window_seconds=0, + linreg_extrapolation_seconds=0, + ) + assert 220 == autoscaling_service_lib.linreg_forecast_policy( + historical_load, + linreg_window_seconds=0, + linreg_extrapolation_seconds=10, + ) + assert 1000 == autoscaling_service_lib.linreg_forecast_policy( + historical_load, + linreg_window_seconds=0, + linreg_extrapolation_seconds=78, + linreg_default_slope=10, + ) + + historical_load_2 = [ + (1, 100), + (2, 100), + (3, 100), + (4, 100), + (5, 100), + (6, 100), + + (1, 100), + (2, 200), + (3, 300), + (4, 400), + (5, 500), + (6, 600), + ] + + assert 350 == autoscaling_service_lib.linreg_forecast_policy( + historical_load_2, + linreg_window_seconds=7, + linreg_extrapolation_seconds=0, + )