Skip to content

Commit

Permalink
Refactor gradual config logic away from the controller
Browse files Browse the repository at this point in the history
Not only this simplifies how the gradual config is triggered, but it
also makes the gradual config transparent to the controller: the
controller just applies a new rate every so much time. This not just
makes the controller much simpler, but also, makes it possible to give
it rate changes that are not necessarily only linear, but that could
have more interesting shapes.
  • Loading branch information
NelsonVides committed Nov 27, 2024
1 parent 4972b26 commit 8efe185
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 228 deletions.
94 changes: 58 additions & 36 deletions src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@
%% Atom representing the name of the throttle.

-type rate() :: infinity | non_neg_integer().
%% Number of events per given `t:interval/0', or
%% - infinity for effectively unlocking all throttling,
%% - zero for effectively pausing the throttle.
%% Number of events per given `t:interval/0'.
%%
%% It can also be:
%% <ul>
%% <li>infinity for effectively unlocking all throttling,</li>
%% <li>zero for effectively pausing the throttle.</li>
%% </ul>

-type interarrival() :: infinity | non_neg_integer().
%% Time in milliseconds between two events, or
%% - infinity for effectively pausing the throttle,
%% - zero for effectively unlocking all throttling.
%% Time in milliseconds between two events.
%%
%% It can also be:
%% <ul>
%% <li>infinity for effectively pausing the throttle,</li>
%% <li>zero for effectively unlocking all throttling.</li>
%% </ul>

-type interval() :: non_neg_integer().
%% In milliseconds, defaults to 60000 (one minute).
Expand All @@ -32,40 +40,44 @@
#{interarrival := interarrival()}.
%% Throttle unit of measurement

-type gradual_rate_config() :: #{from_rate := non_neg_integer(),
to_rate := non_neg_integer(),
interval => interval(),
step_interval => pos_integer(),
step_size => pos_integer(),
step_count => pos_integer(),
duration => pos_integer()} |
#{from_interarrival := interarrival(),
to_interarrival := interarrival(),
step_interval => pos_integer(),
step_size => pos_integer(),
step_count => pos_integer(),
duration => pos_integer()}.
%% Configuration for a gradual throttle rate change
%%
%% `From' and `To' rates are required.
%% `interval' defaults to 1s and `step_size' to 1 (or -1 if applies),
%% that is, the throttle will be changed in increments of 1.
-type gradual() :: #{from_rate := non_neg_integer(),
to_rate := non_neg_integer(),
interval => interval()} |
#{from_interarrival := non_neg_integer(),
to_interarrival := non_neg_integer()}.
%% Configuration throttle for a gradual rate change.
%%
%% `duration' is an alternative way of setting `step_interval',
%% and `step_count' is an alternative for `step_size'.
%% "from" and "to" prefixed parameters, whether rates or interarrivals, are required.
%% `interval' applies only to rate and defaults to 1s.

-type plan() :: #{step_interval := pos_integer(),
step_count := pos_integer()} |
#{duration := pos_integer()}.
%% Configuration plan for a gradual rate change.
%%
%% All other values can be calculated from the provided.
%% The throttle mechanism will take a series of discrete steps,
%% for as long as the duration given,
%% or in the shape of the `step_interval' and `step_count'.

-type gradual_plan() :: #{throttle := gradual(),
plan := plan()}.
%% Gradual plan details. Must specify a `t:gradual/0', and a `t:plan/0'.

-export_type([t/0, name/0, rate/0, interval/0, gradual_rate_config/0]).
-export_type([t/0, name/0, rate/0, interval/0, gradual_plan/0]).

%% @doc Starts the throttle mechanism for a given `Name' with a given config.
%%
%% `Name' is needed to identify the rate as a single test can have different rates for different tasks.
-spec start(name(), t() | rate()) -> {ok, started | already_started} | {error, any()}.
start(Name, #{} = Config) ->
amoc_throttle_controller:ensure_throttle_processes_started(Name, Config);
start(Name, Rate) ->
amoc_throttle_controller:ensure_throttle_processes_started(Name, #{rate => Rate}).
case amoc_throttle_config:verify_config(Config) of
{error, Error} ->
{error, Error};
VerifiedConfig ->
amoc_throttle_controller:ensure_throttle_processes_started(Name, VerifiedConfig)
end;
start(Name, Rate) when is_integer(Rate) ->
start(Name, #{rate => Rate}).

%% @doc Pauses executions for the given `Name' as if `Rate' was set to `0'.
%%
Expand All @@ -89,22 +101,32 @@ resume(Name) ->
%% @doc Sets the throttle `Config' for `Name' according to the given values.
-spec change_rate(name(), t() | rate()) -> ok | {error, any()}.
change_rate(Name, #{} = Config) ->
amoc_throttle_controller:change_rate(Name, Config);
case amoc_throttle_config:verify_config(Config) of
{error, Error} ->
{error, Error};
VerifiedConfig ->
amoc_throttle_controller:change_rate(Name, VerifiedConfig)
end;
change_rate(Name, Rate) when is_integer(Rate) ->
amoc_throttle_controller:change_rate(Name, #{rate => Rate}).
change_rate(Name, #{rate => Rate}).

%% @doc Allows to set a plan of gradual rate changes for a given `Name'.
%%
%% The configuration will be changed in a series of consecutive steps.
%% Rates can be changed upwards as well as downwards.
%% See the documentation for `t:gradual_rate_config/0' for more info.
%% See the documentation for `t:gradual_plan/0' for more info.
%%
%% Be aware that, at first, the rate will be changed to the initial point given
%% in the configuration, and this is not considered a step.
-spec change_rate_gradually(name(), gradual_rate_config()) ->
-spec change_rate_gradually(name(), gradual_plan()) ->
ok | {error, any()}.
change_rate_gradually(Name, Config) ->
amoc_throttle_controller:change_rate_gradually(Name, Config).
case amoc_throttle_config:verify_gradual_config(Config) of
{error, _} = Error ->
Error;
VerifiedConfig ->
amoc_throttle_controller:change_rate_gradually(Name, VerifiedConfig)
end.

%% @doc Executes a given function `Fn' when it does not exceed the rate for `Name'.
%%
Expand Down
123 changes: 48 additions & 75 deletions src/throttle/amoc_throttle_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@
%% @copyright 2024 Erlang Solutions Ltd.
-module(amoc_throttle_config).

-include_lib("kernel/include/logger.hrl").

-define(TIMEOUT(N), (infinity =:= N orelse is_integer(N) andalso N >= 0)).
-define(NON_NEG_INT(N), (is_integer(N) andalso N >= 0)).
-define(POS_INT(N), (is_integer(N) andalso N > 0)).
-define(DEFAULT_INTERVAL, 60000). %% one minute
-define(DEFAULT_STEP_SIZE, 1).
-define(DEFAULT_STEP_INTERVAL, 100). %% every 100ms

-export([verify_config/1, verify_gradual_config/1, pool_config/2, process_pool_config/2]).
-export_type([config/0, gradual/0, pool_config/0]).
-export_type([config/0, gradual_plan/0, pool_config/0]).

-type process_number() :: non_neg_integer().
-type config() :: #{rate := amoc_throttle:rate(),
interval := amoc_throttle:interval()}.
-type gradual() :: #{from_rate := non_neg_integer(),
to_rate := non_neg_integer(),
interval := amoc_throttle:interval(),
step_interval := pos_integer(),
step_size := pos_integer(),
step_count := pos_integer()}.
-type gradual_plan() :: #{rates := [non_neg_integer()],
interval := amoc_throttle:interval(),
step_interval := non_neg_integer()}.
-type pool_config() :: #{process_number() :=
#{max_n := infinity | non_neg_integer(),
delay := non_neg_integer(),
Expand All @@ -46,12 +45,13 @@ verify_config(#{rate := Rate} = Config)
verify_config(_Config) ->
{error, invalid_throttle}.

-spec verify_gradual_config(amoc_throttle:gradual_rate_config()) ->
gradual() | {error, any()}.
-spec verify_gradual_config(amoc_throttle:gradual_plan()) -> gradual_plan() | {error, any()}.
verify_gradual_config(Config) ->
try do_verify_gradual_config(Config) of
Change -> Change
catch error:Reason ->
catch error:Reason:Stacktrace ->
?LOG_WARNING(#{what => bad_gradual_config,
reason => Reason, stacktrace => Stacktrace}),
{error, Reason}
end.

Expand Down Expand Up @@ -113,72 +113,45 @@ calculate_availability(RatePerMinutePerProcess, DelayPerProcess, NoOfProcesses,
no_of_processes() ->
3 * erlang:system_info(schedulers_online).

-spec do_verify_gradual_config(amoc_throttle:gradual_rate_config()) -> gradual().
do_verify_gradual_config(
#{from_interarrival := FromInterarrival, to_interarrival := ToInterarrival} = Config0)
when ?POS_INT(FromInterarrival), ?POS_INT(ToInterarrival), not is_map_key(interval, Config0) ->
FromRate = ?DEFAULT_INTERVAL div FromInterarrival,
ToRate = ?DEFAULT_INTERVAL div ToInterarrival,
Config1 = Config0#{from_rate => FromRate, to_rate => ToRate},
do_verify_gradual_config(maps:without([from_interarrival, to_interarrival], Config1));
do_verify_gradual_config(
#{from_rate := FromRate, to_rate := ToRate,
step_interval := StepInterval, step_count := StepCount, step_size := StepSize} = Config) ->
RateInterval = maps:get(interval, Config, ?DEFAULT_INTERVAL),
check_step_size_with_from_to_rate(FromRate, ToRate, StepSize),
check_step_parameters((ToRate - FromRate) div StepCount, StepSize),
#{from_rate => FromRate, to_rate => ToRate, interval => RateInterval,
step_interval => StepInterval, step_count => StepCount, step_size => StepSize};
do_verify_gradual_config(
#{from_rate := FromRate, to_rate := ToRate, duration := Duration}) ->
StepSize = ?DEFAULT_STEP_SIZE * step_size_sign(FromRate, ToRate),
StepCount = abs((ToRate - FromRate) div StepSize),
StepInterval = abs(Duration div (ToRate - FromRate)),
#{from_rate => FromRate, to_rate => ToRate, interval => ?DEFAULT_INTERVAL,
step_interval => StepInterval, step_count => StepCount, step_size => StepSize};
do_verify_gradual_config(
#{from_rate := FromRate, to_rate := ToRate,
step_interval := StepInterval, step_size := StepSize} = Config) ->
RateInterval = maps:get(interval, Config, ?DEFAULT_INTERVAL),
check_step_size_with_from_to_rate(FromRate, ToRate, StepSize),
StepCount = abs((ToRate - FromRate) div StepSize),
#{from_rate => FromRate, to_rate => ToRate, interval => RateInterval,
step_interval => StepInterval, step_count => StepCount, step_size => StepSize};
-spec do_verify_gradual_config(amoc_throttle:gradual_plan()) -> gradual_plan().
do_verify_gradual_config(
#{from_rate := FromRate, to_rate := ToRate, step_interval := StepInterval} = Config) ->
RateInterval = maps:get(interval, Config, ?DEFAULT_INTERVAL),
StepSize = ?DEFAULT_STEP_SIZE * step_size_sign(FromRate, ToRate),
StepCount = abs((ToRate - FromRate) div StepSize),
#{from_rate => FromRate, to_rate => ToRate, interval => RateInterval,
step_interval => StepInterval, step_count => StepCount, step_size => StepSize};
do_verify_gradual_config(
#{from_rate := FromRate, to_rate := ToRate, step_count := StepCount} = Config) ->
RateInterval = maps:get(interval, Config, ?DEFAULT_INTERVAL),
StepSize = ?DEFAULT_STEP_SIZE * step_size_sign(FromRate, ToRate),
StepInterval = (ToRate - FromRate) div (StepSize * StepCount),
#{from_rate => FromRate, to_rate => ToRate, interval => RateInterval,
step_interval => StepInterval, step_count => StepCount, step_size => StepSize};
do_verify_gradual_config(
#{from_rate := FromRate, to_rate := ToRate, step_size := StepSize} = Config) ->
check_step_size_with_from_to_rate(FromRate, ToRate, StepSize),
RateInterval = maps:get(interval, Config, ?DEFAULT_INTERVAL),
StepCount = abs((ToRate - FromRate) div StepSize),
StepInterval = abs((ToRate - FromRate) div StepCount),
#{from_rate => FromRate, to_rate => ToRate, interval => RateInterval,
step_interval => StepInterval, step_count => StepCount, step_size => StepSize};
#{throttle := #{from_rate := FromRate, to_rate := ToRate, interval := Interval} = Throttle,
plan := #{step_interval := StepInterval, step_count := StepCount} = Plan})
when 3 =:= map_size(Throttle), 2 =:= map_size(Plan),
?NON_NEG_INT(FromRate), ?NON_NEG_INT(ToRate), ?NON_NEG_INT(Interval),
?POS_INT(StepInterval), ?POS_INT(StepCount) ->
StepRate = (ToRate - FromRate) / StepCount,
StepPlan = [ calculate_step(Step, StepRate, FromRate) || Step <- lists:seq(0, StepCount) ],
#{rates => StepPlan, interval => Interval, step_interval => StepInterval};

do_verify_gradual_config(
#{from_rate := FromRate, to_rate := ToRate} = Config) ->
StepSize = ?DEFAULT_STEP_SIZE * step_size_sign(FromRate, ToRate),
RateInterval = maps:get(interval, Config, ?DEFAULT_INTERVAL),
StepCount = abs((ToRate - FromRate) div StepSize),
StepInterval = abs((ToRate - FromRate) div StepCount),
#{from_rate => FromRate, to_rate => ToRate, interval => RateInterval,
step_interval => StepInterval, step_count => StepCount, step_size => StepSize}.
#{throttle := #{from_rate := _, to_rate := _} = Throttle} = Config0)
when 2 =:= map_size(Throttle) ->
Config1 = Config0#{throttle := Throttle#{interval => ?DEFAULT_INTERVAL}},
do_verify_gradual_config(Config1);

check_step_size_with_from_to_rate(From, To, StepSize) when From =< To, StepSize >= 0 -> ok;
check_step_size_with_from_to_rate(From, To, StepSize) when From > To, StepSize < 0 -> ok.
do_verify_gradual_config(
#{throttle := #{from_interarrival := FromInterarrival,
to_interarrival := ToInterarrival} = Throttle} = Config0)
when ?NON_NEG_INT(FromInterarrival), ?NON_NEG_INT(ToInterarrival), 2 =:= map_size(Throttle) ->
FromRate = ?DEFAULT_INTERVAL div FromInterarrival,
ToRate = ?DEFAULT_INTERVAL div ToInterarrival,
Config1 = Config0#{throttle := #{from_rate => FromRate, to_rate => ToRate}},
do_verify_gradual_config(Config1);

step_size_sign(From, To) when From =< To -> 1;
step_size_sign(From, To) when From > To -> -1.
do_verify_gradual_config(
#{throttle := #{from_rate := FromRate, to_rate := ToRate, interval := Interval} = Throttle,
plan := #{duration := Duration} = Plan} = Config0)
when 3 =:= map_size(Throttle), 1 =:= map_size(Plan),
?NON_NEG_INT(FromRate), ?NON_NEG_INT(ToRate), ?NON_NEG_INT(Interval), ?POS_INT(Duration) ->
StepCount = abs(Duration div ?DEFAULT_STEP_INTERVAL),
Config1 = Config0#{plan := #{step_interval => ?DEFAULT_STEP_INTERVAL, step_count => StepCount}},
do_verify_gradual_config(Config1).

check_step_parameters(StepSize, StepSize) -> ok.
-spec calculate_step(
Step :: non_neg_integer(),
StepRate :: float(),
FromRate :: non_neg_integer()) ->
non_neg_integer().
calculate_step(N, StepRate, From) ->
From + floor(StepRate * N).
Loading

0 comments on commit 8efe185

Please sign in to comment.