Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interarrival as throttle #186

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ Amoc supports the following generic configuration parameters:
* default value - empty list (`[]`)
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`


* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
for two consecutive users:
* default value - 50 ms.
* example: `AMOC_INTERARRIVAL="50"`
* `user_rate` - a rate (implemented as a `t:amoc_throttle:rate/0`) of user processes per minute:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is t:amoc_throttle:rate/0?

* default value - 1200.
* example: `AMOC_USER_RATE="600"`
* this parameter can be updated at runtime (in the same way as scenario configuration).

* `extra_code_paths` - a list of paths that should be included using `code:add_pathsz/1` interface
Expand Down
2 changes: 1 addition & 1 deletion src/amoc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ do(Scenario, Count, Settings) ->
case {amoc_controller:start_scenario(Scenario, Settings), Count} of
{ok, 0} -> ok;
{ok, Count} -> amoc_controller:add_users(1, Count);
Error -> Error
{Error, _} -> Error
DenysGonchar marked this conversation as resolved.
Show resolved Hide resolved
end;
Error -> Error
end.
Expand Down
147 changes: 60 additions & 87 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,46 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(DEFAULT_USER_RATE, 1200).

-required_variable(#{name => interarrival, default_value => 50,
verification => {?MODULE, non_neg_integer, 1},
description => "a delay between creating the processes for two "
"consecutive users (ms, def: 50ms)",
update => {?MODULE, maybe_update_interarrival_timer, 2}}).
-required_variable(#{name => user_rate, default_value => ?DEFAULT_USER_RATE,
verification => {?MODULE, verify_user_rate, 1},
description => "Throttle rate for the Scenario:start/1,2 callback",
update => {?MODULE, update_user_rate, 2}}).

-record(state, {scenario :: amoc:scenario() | undefined,
status = idle :: status(),
last_user_id = 0 :: last_user_id(),
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
scenario_state :: any(), %% state returned from Scenario:init/0
create_users = [] :: [amoc_scenario:user_id()],
tref :: timer:tref() | undefined}).
scenario_state :: amoc_scenario:state() %% state returned from Scenario:init/0
}).

-type status() :: idle | running | terminating | finished | {error, any()} | disabled.
%% Scenario status.

-type state() :: #state{}.
%% Internal state of the node's controller

-type handle_call_res() :: ok | {ok, term()} | {error, term()}.
-type running_status() :: #{scenario := amoc:scenario(),
currently_running_users := user_count(),
highest_user_id := last_user_id()}.
%% Details about the scenario currently running

-type amoc_status() :: idle |
{running, running_status()} |
{terminating, amoc:scenario()} |
{finished, amoc:scenario()} |
{error, any()} |
disabled.
%% Status of the node, note that amoc_controller is disabled for the master node

-type user_count() :: non_neg_integer().
%% Number of users currently running in the node

-type last_user_id() :: non_neg_integer().
%% Highest user id registered in the node
-type interarrival() :: non_neg_integer().

-type user_rate() :: amoc_throttle:rate().
%% Time to wait in between spawning new users

%% ------------------------------------------------------------------
Expand All @@ -65,14 +71,14 @@
%% ------------------------------------------------------------------
%% Parameters verification functions
%% ------------------------------------------------------------------
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).
-export([update_user_rate/2, verify_user_rate/1]).

-export([zero_users_running/0]).
-export([wait_user_rate/0, zero_users_running/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

%% ------------------------------------------------------------------
%% API Function Definitions
Expand Down Expand Up @@ -122,14 +128,16 @@ disable() ->
gen_server:call(?SERVER, disable).

%% @private
-spec non_neg_integer(any()) -> boolean().
non_neg_integer(Interarrival) ->
is_integer(Interarrival) andalso Interarrival >= 0.
-spec verify_user_rate(any()) -> boolean().
verify_user_rate(UserRate) ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use braces to make expression more readable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(is_integer(UserRate) andalso (0 =< UserRate))
     orelse  (infinity =:= UserRate)

(infinity =:= UserRate)
orelse is_integer(UserRate)
andalso (0 =< UserRate).

%% @private
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
maybe_update_interarrival_timer(interarrival, _) ->
gen_server:cast(?SERVER, maybe_update_interarrival_timer).
-spec update_user_rate(user_rate, user_rate()) -> ok.
update_user_rate(user_rate, UserRate) ->
ok = amoc_throttle:change_rate(user_rate, #{rate => UserRate}).

%% @private
-spec zero_users_running() -> ok.
Expand Down Expand Up @@ -180,8 +188,6 @@ handle_call(_Request, _From, State) ->

%% @private
-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast(maybe_update_interarrival_timer, State) ->
{noreply, maybe_update_interarrival_timer(State)};
handle_cast(zero_users_running, State) ->
NewSate = handle_zero_users_running(State),
{noreply, NewSate};
Expand All @@ -190,15 +196,14 @@ handle_cast(_Msg, State) ->

%% @private
-spec handle_info(any(), state()) -> {noreply, state()}.
handle_info(start_user, State) ->
NewSate = handle_start_user(State),
{noreply, NewSate};
handle_info(start_all_users, State) ->
NewSate = handle_start_all_users(State),
{noreply, NewSate};
handle_info(_Msg, State) ->
{noreply, State}.

%% @private
-spec terminate(term(), state()) -> any().
terminate(_Reason, _State) ->
amoc_users_sup:terminate_all_children().

%% ------------------------------------------------------------------
%% internal functions
%% ------------------------------------------------------------------
Expand Down Expand Up @@ -243,18 +248,15 @@ handle_update_settings(_Settings, #state{status = Status}) ->

-spec handle_add(amoc_scenario:user_id(), amoc_scenario:user_id(), state()) ->
{handle_call_res(), state()}.
handle_add(StartId, EndId, #state{last_user_id = LastId,
create_users = ScheduledUsers,
status = running,
handle_add(StartId, EndId, #state{status = running,
last_user_id = LastId,
scenario = Scenario,
tref = TRef} = State) when StartId =< EndId,
LastId < StartId ->
scenario_state = ScenarioState} = State)
when StartId =< EndId, LastId < StartId ->
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
#{scenario => Scenario, type => add}),
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
amoc_users_sup:start_children(Scenario, {StartId, EndId}, ScenarioState),
{ok, State#state{last_user_id = EndId}};
handle_add(_StartId, _EndId, #state{status = running} = State) ->
{{error, invalid_range}, State};
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
Expand Down Expand Up @@ -287,23 +289,6 @@ handle_disable(#state{status = idle} = State) ->
handle_disable(#state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_start_user(state()) -> state().
handle_start_user(#state{create_users = [UserId | T],
scenario = Scenario,
scenario_state = ScenarioState} = State) ->
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
State#state{create_users = T};
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
State#state{tref = maybe_stop_timer(TRef)}.

-spec handle_start_all_users(state()) -> state().
handle_start_all_users(#state{create_users = AllUsers,
scenario = Scenario,
scenario_state = ScenarioState,
tref = TRef} = State) ->
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.

%% ------------------------------------------------------------------
%% helpers
%% ------------------------------------------------------------------
Expand All @@ -316,12 +301,15 @@ start_tables() -> %% ETS creation
{ok | error, any()}.
init_scenario(Scenario, Settings) ->
case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of
ok -> amoc_scenario:init(Scenario);
ok ->
start_user_rate(),
amoc_scenario:init(Scenario);
{error, Type, Reason} -> {error, {Type, Reason}}
end.

-spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}.
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
stop_user_rate(),
amoc_scenario:terminate(Scenario, ScenarioState).

-spec handle_zero_users_running(state()) -> state().
Expand All @@ -331,35 +319,20 @@ handle_zero_users_running(#state{status = terminating} = State) ->
handle_zero_users_running(State) ->
State.

-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
maybe_stop_timer(undefined) ->
undefined;
maybe_stop_timer(TRef) ->
{ok, cancel} = timer:cancel(TRef),
undefined.

-spec get_interarrival() -> interarrival().
get_interarrival() ->
amoc_config:get(interarrival).

-spec maybe_update_interarrival_timer(state()) -> state().
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
State;
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
{ok, cancel} = timer:cancel(TRef),
Value = get_interarrival(),
NewTRef = do_interarrival(Value),
State#state{tref = NewTRef}.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
Value = get_interarrival(),
do_interarrival(Value);
maybe_start_timer(TRef) -> TRef.

do_interarrival(0) ->
self() ! start_all_users,
undefined;
do_interarrival(Value) ->
{ok, NewTRef} = timer:send_interval(Value, start_user),
NewTRef.
-spec get_user_rate() -> user_rate().
get_user_rate() ->
amoc_config:get(user_rate, ?DEFAULT_USER_RATE).

-spec wait_user_rate() -> boolean().
wait_user_rate() ->
0 =:= get_user_rate()
orelse ok =:= amoc_throttle:wait(user_rate).

-spec start_user_rate() -> any().
start_user_rate() ->
UserRate = get_user_rate(),
amoc_throttle:start(user_rate, #{rate => UserRate}).

-spec stop_user_rate() -> any().
stop_user_rate() ->
amoc_throttle:stop(user_rate).
2 changes: 1 addition & 1 deletion src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
plan := plan()}.
%% Gradual plan details. Must specify a `t:gradual/0', and a `t:plan/0'.

-export_type([t/0, name/0, rate/0, interval/0, gradual_plan/0]).
-export_type([t/0, name/0, rate/0, interval/0, interarrival/0, gradual_plan/0]).

%% @doc Starts the throttle mechanism for a given `Name' with a given config.
%%
Expand Down
3 changes: 2 additions & 1 deletion src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ensure_throttle_processes_started/2,
pause/1, resume/1, stop/1, get_info/1,
change_rate/2, change_rate_gradually/2,
pg_scope/0,
pg_scope/0, consume_all_timer_ticks/1,
get_throttle_process/1,
raise_event_on_slave_node/2, telemetry_event/2]).

Expand Down Expand Up @@ -267,6 +267,7 @@ continue_plan(Name, State, Info, #change_rate_plan{rates = [Rate | Rates]} = Pla
NewPlan = Plan#change_rate_plan{rates = Rates},
State#{Name => Info1#throttle_info{change_plan = NewPlan}}.

-spec consume_all_timer_ticks(any()) -> ok.
consume_all_timer_ticks(Msg) ->
receive
Msg -> consume_all_timer_ticks(Msg)
Expand Down
8 changes: 1 addition & 7 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,13 @@ maybe_stop_timer(#state{tref = undefined}) ->
ok;
maybe_stop_timer(#state{tref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
consume_all_timer_ticks(delay_between_executions).
amoc_throttle_controller:consume_all_timer_ticks(delay_between_executions).

timeout(#state{delay_between_executions = infinity}) ->
infinity;
timeout(#state{delay_between_executions = Delay}) ->
Delay + ?DEFAULT_MSG_TIMEOUT.

consume_all_timer_ticks(Msg) ->
receive
Msg -> consume_all_timer_ticks(Msg)
after 0 -> ok
end.

maybe_run_fn(#state{schedule = [], schedule_reversed = []} = State) ->
State;
maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) ->
Expand Down
9 changes: 2 additions & 7 deletions src/users/amoc_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,11 @@
-module(amoc_user).

%% API
-export([start_link/3]).
-export([stop/0, stop/2]).
-export([init/4]).

-type state() :: term().

-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
{ok, pid()} | {error, term()}.
start_link(Scenario, Id, State) ->
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).

-spec stop() -> ok.
stop() ->
stop(self(), false).
Expand All @@ -24,6 +18,7 @@ stop(Pid, Force) when is_pid(Pid) ->

-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) -> term().
init(Parent, Scenario, Id, State) ->
proc_lib:init_ack(Parent, {ok, self()}),
amoc_controller:wait_user_rate(),
amoc_users_worker_sup:user_up(Parent, Id),
process_flag(trap_exit, true),
amoc_scenario:start(Scenario, Id, State).
Loading