Skip to content

Commit

Permalink
Unify duplicated function
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 2, 2024
1 parent d865393 commit 8886183
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 11 deletions.
9 changes: 5 additions & 4 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ensure_throttle_processes_started/2,
pause/1, resume/1, stop/1, get_info/1,
change_rate/2, change_rate_gradually/2,
pg_scope/0,
pg_scope/0, consume_all_messages/1,
get_throttle_process/1,
raise_event_on_slave_node/2, telemetry_event/2]).

Expand Down Expand Up @@ -260,16 +260,17 @@ continue_plan(Name, State, Info, #change_rate_plan{rates = [Rate]} = Plan) ->
TRef = Plan#change_rate_plan.timer,
Info1 = do_change_rate(Name, Rate, Interval, Info),
{ok, cancel} = timer:cancel(TRef),
consume_all_timer_ticks({change_plan, Name}),
consume_all_messages({change_plan, Name}),
State#{Name => Info1#throttle_info{change_plan = undefined}};
continue_plan(Name, State, Info, #change_rate_plan{rates = [Rate | Rates]} = Plan) ->
Info1 = do_change_rate(Name, Rate, Info#throttle_info.interval, Info),
NewPlan = Plan#change_rate_plan{rates = Rates},
State#{Name => Info1#throttle_info{change_plan = NewPlan}}.

consume_all_timer_ticks(Msg) ->
-spec consume_all_messages(any()) -> ok.
consume_all_messages(Msg) ->
receive
Msg -> consume_all_timer_ticks(Msg)
Msg -> consume_all_messages(Msg)
after 0 -> ok
end.

Expand Down
8 changes: 1 addition & 7 deletions src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,13 @@ maybe_stop_timer(#state{tref = undefined}) ->
ok;
maybe_stop_timer(#state{tref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
consume_all_timer_ticks(delay_between_executions).
amoc_throttle_controller:consume_all_messages(delay_between_executions).

timeout(#state{delay_between_executions = infinity}) ->
infinity;
timeout(#state{delay_between_executions = Delay}) ->
Delay + ?DEFAULT_MSG_TIMEOUT.

consume_all_timer_ticks(Msg) ->
receive
Msg -> consume_all_timer_ticks(Msg)
after 0 -> ok
end.

maybe_run_fn(#state{schedule = [], schedule_reversed = []} = State) ->
State;
maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) ->
Expand Down

0 comments on commit 8886183

Please sign in to comment.