From b55455ccb875940f7e29d0bb131b0f9d434fb849 Mon Sep 17 00:00:00 2001 From: Nelson Vides Date: Wed, 21 Feb 2024 11:45:37 +0100 Subject: [PATCH] Rework removal assignments more efficiently --- src/users/amoc_users_sup.erl | 74 ++++++++++++++++------------- src/users/amoc_users_worker_sup.erl | 2 + 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/users/amoc_users_sup.erl b/src/users/amoc_users_sup.erl index 59d85060..b1ac3d8c 100644 --- a/src/users/amoc_users_sup.erl +++ b/src/users/amoc_users_sup.erl @@ -134,38 +134,6 @@ get_sup_for_user_id(Id) -> Index = Id rem tuple_size(Supervisors) + 1, element(Index, Supervisors). -%% assign how many users each worker will be requested to remove, -%% taking care of the fact that worker might not have enough users -assign_counts(Total) -> - #storage{user_count = Atomics, sups = Sups} = persistent_term:get(?MODULE), - NumOfSupervisors = tuple_size(Sups), - Supervisors = tuple_to_list(Sups), - UsersPerSup = [ atomics:get(Atomics, SupPos) || SupPos <- lists:seq(2, NumOfSupervisors + 1) ], - SupervisorsWithCounts = lists:zip(Supervisors, UsersPerSup), - SupervisorWithPositiveCounts = [ T || T = {_, Count} <- SupervisorsWithCounts, Count =/= 0], - Data = maps:from_list(SupervisorWithPositiveCounts), - distribute(#{}, Data, SupervisorWithPositiveCounts, Total). - --spec distribute(#{pid() := count()}, #{pid() := count()}, [{pid(), count()}], count()) -> - {count(), [{pid(), count()}]}. -%% Assigned all or not enough active users, already assigned all possible -distribute(Acc, Data, _, Left) when 0 =:= Left; 0 =:= map_size(Data) -> - {lists:sum(maps:values(Acc)), maps:to_list(Acc)}; -%% Already assigned one round and still have counts left and running users available, loop again -distribute(Acc, Data, [], Left) -> - distribute(Acc, Data, maps:to_list(Data), Left); -distribute(Acc, Data, [{Sup, Count} | Rest], Left) -> - NewAcc = maps:put(Sup, maps:get(Sup, Acc, 0) + 1, Acc), - NewData = case Count of - 1 -> - %% Assigning last possible user to this sup, remove from data - maps:remove(Sup, Data); - _ -> - %% Assign one more to this sup and continue assigning - maps:put(Sup, Count - 1, Data) - end, - distribute(NewAcc, NewData, Rest, Left - 1). - %% assign which users each worker will be requested to add -spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) -> Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}. @@ -177,3 +145,45 @@ assign_users_to_sups(NumOfSupervisors, Supervisors, [Id | Ids], Acc) -> assign_users_to_sups(NumOfSupervisors, Supervisors, Ids, NewAcc); assign_users_to_sups(_, _, [], Acc) -> Acc. + +%% assign how many users each worker will be requested to remove, +%% taking care of the fact that worker might not have enough users. +-spec assign_counts(count()) -> {count(), [{pid(), count()}]}. +assign_counts(Total) -> + #storage{user_count = Atomics, sups = Sups} = persistent_term:get(?MODULE), + NumOfSupervisors = tuple_size(Sups), + Average = Total div NumOfSupervisors, + Rem = Total rem NumOfSupervisors, + Supervisors = tuple_to_list(Sups), + Indexes = lists:seq(2, NumOfSupervisors + 1), + IndexedSups = lists:zip(Supervisors, Indexes), + {_, _, Indexed} = lists:foldr(fun assign_counts_to_sup/2, {Average, Rem, []}, IndexedSups), + {_, _, CorrectedCounts} = lists:foldl(fun correct_counts/2, {Atomics, Rem, []}, Indexed), + {_, Totals} = lists:unzip(CorrectedCounts), + {lists:sum(Totals), CorrectedCounts}. + +-spec assign_counts_to_sup({pid(), pos_integer()}, {count(), non_neg_integer(), Acc}) -> + {count(), non_neg_integer(), Acc} + when Acc :: [{pid(), pos_integer(), count()}]. +assign_counts_to_sup({Sup, SupPos}, {Average, 0, Acc}) -> + {Average, 0, [{Sup, SupPos, Average} | Acc]}; +assign_counts_to_sup({Sup, SupPos}, {Average, Rem, Acc}) -> + {Average, Rem - 1, [{Sup, SupPos, Average + 1} | Acc]}. + +-spec correct_counts({pid(), pos_integer(), count()}, + {atomics:atomics_ref(), non_neg_integer(), Acc}) -> + {atomics:atomics_ref(), non_neg_integer(), Acc} when Acc :: [{pid(), count()}]. +correct_counts({Sup, SupPos, CurrentAssignment}, {Atomics, Rem, Acc}) -> + ChildrenInSup = atomics:get(Atomics, SupPos), + Excess = min(Rem, abs(CurrentAssignment - ChildrenInSup)), + case {ChildrenInSup >= CurrentAssignment, Rem, Excess} of + %% We overassigned, simply kill all this sup has and increase the remainings + {false, _, Excess} -> + {Atomics, Rem + Excess, [{Sup, ChildrenInSup} | Acc]}; + %% Sup has capacity and nothing was remaining to assign + {true, 0, _} -> + {Atomics, Rem, [{Sup, CurrentAssignment} | Acc]}; + %% We have something remaining, Excess is as much as this supervisor can take + {true, Rem, Excess} -> + {Atomics, Rem - Excess, [{Sup, CurrentAssignment + Excess} | Acc]} + end. diff --git a/src/users/amoc_users_worker_sup.erl b/src/users/amoc_users_worker_sup.erl index 696ac450..16509c76 100644 --- a/src/users/amoc_users_worker_sup.erl +++ b/src/users/amoc_users_worker_sup.erl @@ -84,6 +84,8 @@ handle_cast({start_child, Scenario, Id, ScenarioState}, State) -> handle_cast({start_children, Scenario, Ids, ScenarioState}, State) -> [ do_start_child(Scenario, Id, ScenarioState, State) || Id <- Ids], {noreply, State}; +handle_cast({stop_children, 0, _}, State) -> + {noreply, State}; handle_cast({stop_children, Int, ForceRemove}, #state{tid = Tid} = State) -> Pids = case ets:match_object(Tid, '$1', Int) of '$end_of_table' ->