Skip to content

Commit

Permalink
Rework removal assignments more efficiently
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Feb 21, 2024
1 parent 8458b36 commit 7effd57
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 32 deletions.
74 changes: 42 additions & 32 deletions src/users/amoc_users_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,6 @@ get_sup_for_user_id(Id) ->
Index = Id rem tuple_size(Supervisors) + 1,
element(Index, Supervisors).

%% assign how many users each worker will be requested to remove,
%% taking care of the fact that worker might not have enough users
assign_counts(Total) ->
#storage{user_count = Atomics, sups = Sups} = persistent_term:get(?MODULE),
NumOfSupervisors = tuple_size(Sups),
Supervisors = tuple_to_list(Sups),
UsersPerSup = [ atomics:get(Atomics, SupPos) || SupPos <- lists:seq(2, NumOfSupervisors + 1) ],
SupervisorsWithCounts = lists:zip(Supervisors, UsersPerSup),
SupervisorWithPositiveCounts = [ T || T = {_, Count} <- SupervisorsWithCounts, Count =/= 0],
Data = maps:from_list(SupervisorWithPositiveCounts),
distribute(#{}, Data, SupervisorWithPositiveCounts, Total).

-spec distribute(#{pid() := count()}, #{pid() := count()}, [{pid(), count()}], count()) ->
{count(), [{pid(), count()}]}.
%% Assigned all or not enough active users, already assigned all possible
distribute(Acc, Data, _, Left) when 0 =:= Left; 0 =:= map_size(Data) ->
{lists:sum(maps:values(Acc)), maps:to_list(Acc)};
%% Already assigned one round and still have counts left and running users available, loop again
distribute(Acc, Data, [], Left) ->
distribute(Acc, Data, maps:to_list(Data), Left);
distribute(Acc, Data, [{Sup, Count} | Rest], Left) ->
NewAcc = maps:put(Sup, maps:get(Sup, Acc, 0) + 1, Acc),
NewData = case Count of
1 ->
%% Assigning last possible user to this sup, remove from data
maps:remove(Sup, Data);
_ ->
%% Assign one more to this sup and continue assigning
maps:put(Sup, Count - 1, Data)
end,
distribute(NewAcc, NewData, Rest, Left - 1).

%% assign which users each worker will be requested to add
-spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) ->
Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}.
Expand All @@ -177,3 +145,45 @@ assign_users_to_sups(NumOfSupervisors, Supervisors, [Id | Ids], Acc) ->
assign_users_to_sups(NumOfSupervisors, Supervisors, Ids, NewAcc);
assign_users_to_sups(_, _, [], Acc) ->
Acc.

%% assign how many users each worker will be requested to remove,
%% taking care of the fact that worker might not have enough users.
-spec assign_counts(count()) -> {count(), [{pid(), count()}]}.
assign_counts(Total) ->
#storage{user_count = Atomics, sups = Sups} = persistent_term:get(?MODULE),
NumOfSupervisors = tuple_size(Sups),
Average = Total div NumOfSupervisors,
Rem = Total rem NumOfSupervisors,
Supervisors = tuple_to_list(Sups),
Indexes = lists:seq(2, NumOfSupervisors + 1),
IndexedSups = lists:zip(Supervisors, Indexes),
{_, _, Indexed} = lists:foldr(fun assign_counts_to_sup/2, {Average, Rem, []}, IndexedSups),
{_, _, CorrectedCounts} = lists:foldl(fun correct_counts/2, {Atomics, Rem, []}, Indexed),
{_, Totals} = lists:unzip(CorrectedCounts),
{lists:sum(Totals), CorrectedCounts}.

-spec assign_counts_to_sup({pid(), pos_integer()}, {count(), non_neg_integer(), Acc}) ->
{count(), non_neg_integer(), Acc}
when Acc :: [{pid(), pos_integer(), count()}].
assign_counts_to_sup({Sup, SupPos}, {Average, 0, Acc}) ->
{Average, 0, [{Sup, SupPos, Average} | Acc]};
assign_counts_to_sup({Sup, SupPos}, {Average, Rem, Acc}) ->
{Average, Rem - 1, [{Sup, SupPos, Average + 1} | Acc]}.

-spec correct_counts({pid(), pos_integer(), count()},
{atomics:atomics_ref(), non_neg_integer(), Acc}) ->
{atomics:atomics_ref(), non_neg_integer(), Acc} when Acc :: [{pid(), count()}].
correct_counts({Sup, SupPos, CurrentAssignment}, {Atomics, Rem, Acc}) ->
ChildrenInSup = atomics:get(Atomics, SupPos),
Excess = min(Rem, abs(CurrentAssignment - ChildrenInSup)),
case {ChildrenInSup >= CurrentAssignment, Rem, Excess} of
%% We overassigned, simply kill all this sup has and increase the remainings
{false, Excess, _} ->
{Atomics, Rem + Excess, [{Sup, ChildrenInSup} | Acc]};
%% Sup has capacity and nothing was remaining to assign
{true, 0, _} ->
{Atomics, Rem, [{Sup, CurrentAssignment} | Acc]};
%% We have something remaining, Excess is as much as this supervisor can take
{true, Rem, Excess} ->
{Atomics, Rem - Excess, [{Sup, CurrentAssignment + Excess} | Acc]}
end.
2 changes: 2 additions & 0 deletions src/users/amoc_users_worker_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ handle_cast({start_child, Scenario, Id, ScenarioState}, State) ->
handle_cast({start_children, Scenario, Ids, ScenarioState}, State) ->
[ do_start_child(Scenario, Id, ScenarioState, State) || Id <- Ids],
{noreply, State};
handle_cast({stop_children, 0, _}, State) ->
{noreply, State};
handle_cast({stop_children, Int, ForceRemove}, #state{tid = Tid} = State) ->
Pids = case ets:match_object(Tid, '$1', Int) of
'$end_of_table' ->
Expand Down

0 comments on commit 7effd57

Please sign in to comment.