Skip to content

Commit

Permalink
Optimise user supervision
Browse files Browse the repository at this point in the history
In the original implementation, the process is monitored by two supervisors, the
original amoc_users_sup, and the amoc_controller one. But this means that bursts
of ups and downs overflow the mailbox of _two_ processes, one of them being
actually the very critical controller.

With this reimplementation, only the amoc_users_sup tracks the processes, and
all requests from the controller are asynchronous. This ensures the controller
does not get blocked and can remain responsive to control requests.
  • Loading branch information
NelsonVides committed Dec 19, 2023
1 parent 8590a36 commit a906215
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 90 deletions.
2 changes: 1 addition & 1 deletion integration_test/extra_code_paths/path1/dummy_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ test_amoc_dist() ->
get_users_info(SlaveNodes) ->
Users = [{Node, Id} ||
Node <- SlaveNodes,
{Id, _Pid} <- rpc:call(Node, ets, tab2list, [amoc_users])],
{_Pid, Id} <- rpc:call(Node, ets, tab2list, [amoc_users])],
Ids = lists:usort([Id || {_, Id} <- Users]),
Nodes = lists:usort([Node || {Node, _} <- Users]),
N = length(Ids),
Expand Down
97 changes: 36 additions & 61 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(USERS_TABLE, amoc_users).

-required_variable(#{name => interarrival, default_value => 50,
verification => {?MODULE, positive_integer, 1},
Expand All @@ -17,7 +16,6 @@
update => {?MODULE, maybe_update_interarrival_timer, 2}}).

-record(state, {scenario :: amoc:scenario() | undefined,
no_of_users = 0 :: user_count(),
last_user_id = 0 :: last_user_id(),
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
Expand Down Expand Up @@ -69,6 +67,8 @@
%% ------------------------------------------------------------------
-export([maybe_update_interarrival_timer/2, positive_integer/1]).

-export([zero_users_running/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
Expand All @@ -77,6 +77,7 @@
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec start_link() -> {ok, pid()}.
start_link() ->
Expand Down Expand Up @@ -129,9 +130,16 @@ positive_integer(Interarrival) ->
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
maybe_update_interarrival_timer(interarrival, _) ->
gen_server:cast(?SERVER, maybe_update_interarrival_timer).

%% @private
-spec zero_users_running() -> ok.
zero_users_running() ->
gen_server:cast(?SERVER, zero_users_running).

%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

%% @private
-spec init([]) -> {ok, state()}.
init([]) ->
Expand Down Expand Up @@ -174,6 +182,9 @@ handle_call(_Request, _From, State) ->
-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast(maybe_update_interarrival_timer, State) ->
{noreply, maybe_update_interarrival_timer(State)};
handle_cast(zero_users_running, State) ->
NewSate = handle_zero_users_running(State),
{noreply, NewSate};
handle_cast(_Msg, State) ->
{noreply, State}.

Expand All @@ -182,9 +193,6 @@ handle_cast(_Msg, State) ->
handle_info(start_user, State) ->
NewSate = handle_start_user(State),
{noreply, NewSate};
handle_info({'DOWN', _, process, Pid, _}, State) ->
NewSate = handle_stop_user(Pid, State),
{noreply, NewSate};
handle_info(_Msg, State) ->
{noreply, State}.

Expand All @@ -209,12 +217,15 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
terminate_scenario(State),
{ok, State#state{status = finished}};
handle_stop_scenario(#state{status = running} = State) ->
terminate_all_users(),
{ok, State#state{status = terminating}};
case amoc_users_sup:count_children() of
0 ->
terminate_scenario(State),
{ok, State#state{status = finished}};
_ ->
amoc_users_sup:terminate_all_children(),
{ok, State#state{status = terminating}}
end;
handle_stop_scenario(#state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

Expand Down Expand Up @@ -249,20 +260,17 @@ handle_add(_StartId, _EndId, #state{status = Status} = State) ->

-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
amoc_telemetry:execute([controller, users], #{count => Count},
CountRemove = amoc_users_sup:stop_children(Count, ForceRemove),
amoc_telemetry:execute([controller, users], #{count => CountRemove},
#{scenario => Scenario, type => remove}),
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
'$end_of_table' -> []
end,
amoc_users_sup:stop_children(Pids, ForceRemove),
{ok, length(Pids)};
{ok, CountRemove};
handle_remove(_Count, _ForceRemove, #state{status = Status}) ->
{error, {invalid_status, Status}}.

-spec handle_status(state()) -> amoc_status().
handle_status(#state{status = running, scenario = Scenario,
no_of_users = N, last_user_id = LastId}) ->
last_user_id = LastId}) ->
N = amoc_users_sup:count_children(),
{running, #{scenario => Scenario, currently_running_users => N, highest_user_id => LastId}};
handle_status(#state{status = terminating, scenario = Scenario}) ->
{terminating, Scenario};
Expand All @@ -279,33 +287,18 @@ handle_disable(#state{status = Status} = State) ->

-spec handle_start_user(state()) -> state().
handle_start_user(#state{create_users = [UserId | T],
no_of_users = N,
scenario = Scenario,
scenario_state = ScenarioState} = State) ->
start_user(Scenario, UserId, ScenarioState),
State#state{create_users = T, no_of_users = N + 1};
State#state{create_users = T};
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
State#state{tref = maybe_stop_timer(TRef)}.

-spec handle_stop_user(pid(), state()) -> state().
handle_stop_user(Pid, State) ->
case ets:match(?USERS_TABLE, {'$1', Pid}, 1) of
{[[UserId]], _} ->
ets:delete(?USERS_TABLE, UserId),
dec_no_of_users(State);
_ ->
State
end.

%% ------------------------------------------------------------------
%% helpers
%% ------------------------------------------------------------------
-spec start_tables() -> ok.
start_tables() -> %% ETS creation
?USERS_TABLE = ets:new(?USERS_TABLE, [named_table,
ordered_set,
protected,
{read_concurrency, true}]),
amoc_config_utils:create_amoc_config_ets(),
ok.

Expand All @@ -321,6 +314,13 @@ init_scenario(Scenario, Settings) ->
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
amoc_scenario:terminate(Scenario, ScenarioState).

-spec handle_zero_users_running(state()) -> state().
handle_zero_users_running(#state{status = terminating} = State) ->
terminate_scenario(State),
State#state{status = finished};
handle_zero_users_running(State) ->
State.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
{ok, TRef} = timer:send_interval(interarrival(), start_user),
Expand All @@ -334,34 +334,9 @@ maybe_stop_timer(TRef) ->
{ok, cancel} = timer:cancel(TRef),
undefined.

-spec start_user(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok.
-spec start_user(amoc:scenario(), amoc_scenario:user_id(), any()) -> term().
start_user(Scenario, Id, ScenarioState) ->
{ok, Pid} = supervisor:start_child(amoc_users_sup, [Scenario, Id, ScenarioState]),
ets:insert(?USERS_TABLE, {Id, Pid}),
erlang:monitor(process, Pid),
ok.

-spec terminate_all_users() -> any().
terminate_all_users() ->
%stop all the users
Match = ets:match_object(?USERS_TABLE, '$1', 200),
terminate_all_users(Match).

%% ets:continuation/0 type is unfortunately not exported from the ets module.
-spec terminate_all_users({tuple(), term()} | '$end_of_table') -> ok.
terminate_all_users({Objects, Continuation}) ->
Pids = [Pid || {_Id, Pid} <- Objects],
amoc_users_sup:stop_children(Pids, true),
Match = ets:match_object(Continuation),
terminate_all_users(Match);
terminate_all_users('$end_of_table') -> ok.

-spec dec_no_of_users(state()) -> state().
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
terminate_scenario(State),
State#state{no_of_users = 0, status = finished};
dec_no_of_users(#state{no_of_users = N} = State) ->
State#state{no_of_users = N - 1}.
amoc_users_sup:start_child(Scenario, Id, ScenarioState).

-spec interarrival() -> interarrival().
interarrival() ->
Expand Down
4 changes: 2 additions & 2 deletions src/amoc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ start_link() ->
[ChildSpec :: supervisor:child_spec()]
}}.
init([]) ->
{ok, {{one_for_one, 5, 10},
{ok, {{rest_for_one, 5, 10},
[
?CHILD(amoc_users_sup, supervisor),
?CHILD(amoc_users_sup, worker),
?CHILD(amoc_controller, worker),
?CHILD(amoc_cluster, worker),
?CHILD(amoc_code_server, worker),
Expand Down
2 changes: 1 addition & 1 deletion src/amoc_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-type state() :: term().

-spec start_link(amoc:scenario(), amoc_scenario:user_id(), state()) ->
{ok, pid()}.
{ok, pid()} | {error, term()}.
start_link(Scenario, Id, State) ->
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).

Expand Down
Loading

0 comments on commit a906215

Please sign in to comment.