diff --git a/src/throttle/amoc_throttle.erl b/src/throttle/amoc_throttle.erl index 97b11c62..a129b49d 100644 --- a/src/throttle/amoc_throttle.erl +++ b/src/throttle/amoc_throttle.erl @@ -5,7 +5,7 @@ %% API -export([start/2, stop/1, send/2, send/3, wait/1, - run/2, pause/1, resume/1, + run/2, pause/1, resume/1, unlock/1, change_rate/2, change_rate/3, change_rate_gradually/2, change_rate_gradually/6]). @@ -74,6 +74,11 @@ pause(Name) -> resume(Name) -> amoc_throttle_controller:resume(Name). +%% @doc Unlocks executions for the given `Name' as if `Rate' was set to `infinity'. +-spec unlock(name()) -> ok | {error, any()}. +unlock(Name) -> + amoc_throttle_controller:unlock(Name). + %% @doc Sets `Throttle' for `Name' according to the given values. %% %% Can change whether Amoc throttle limits `Name' to parallel executions or to `Rate' per `Interval', diff --git a/src/throttle/amoc_throttle_controller.erl b/src/throttle/amoc_throttle_controller.erl index 58818846..225db715 100644 --- a/src/throttle/amoc_throttle_controller.erl +++ b/src/throttle/amoc_throttle_controller.erl @@ -9,7 +9,7 @@ %% API -export([start_link/0, ensure_throttle_processes_started/2, - pause/1, resume/1, stop/1, + pause/1, resume/1, unlock/1, stop/1, change_rate/3, change_rate_gradually/2, raise_event_on_slave_node/2, telemetry_event/2]). @@ -112,6 +112,10 @@ pause(Name) -> resume(Name) -> gen_server:call(?MASTER_SERVER, {resume, Name}). +-spec unlock(name()) -> ok | {error, any()}. +unlock(Name) -> + gen_server:call(?MASTER_SERVER, {unlock, Name}). + -spec change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> ok | {error, any()}. change_rate(Name, Rate, Interval) -> gen_server:call(?MASTER_SERVER, {change_rate, Name, Rate, Interval}). @@ -362,7 +366,9 @@ run_cmd(Pid, stop) -> run_cmd(Pid, pause) -> amoc_throttle_process:pause(Pid); run_cmd(Pid, resume) -> - amoc_throttle_process:resume(Pid). + amoc_throttle_process:resume(Pid); +run_cmd(Pid, unlock) -> + amoc_throttle_process:unlock(Pid). -spec verify_config(amoc_throttle:gradual_rate_config()) -> gradual_rate_change() | {error, any()}. verify_config(Config) -> diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index 90ab1689..4352ce1f 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -12,6 +12,7 @@ update/3, pause/1, resume/1, + unlock/1, get_state/1, get_throttle_process/1, get_throttle_processes/1 @@ -30,7 +31,7 @@ -define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute -record(state, {can_run_fn = true :: boolean(), - status = running :: running | paused, + status = running :: running | paused | unlocked, max_n :: infinity | non_neg_integer(), name :: atom(), n :: infinity | non_neg_integer(), @@ -69,6 +70,10 @@ pause(Pid) -> resume(Pid) -> gen_server:cast(Pid, resume_process). +-spec unlock(pid()) -> ok. +unlock(Pid) -> + gen_server:cast(Pid, unlock_process). + -spec get_state(pid()) -> map(). get_state(Pid) -> gen_server:call(Pid, get_state). @@ -123,6 +128,8 @@ handle_cast(pause_process, State) -> {noreply, State#state{status = paused}, {continue, maybe_run_fn}}; handle_cast(resume_process, State) -> {noreply, State#state{status = running}, {continue, maybe_run_fn}}; +handle_cast(unlock_process, State) -> + {noreply, State#state{status = unlocked}, {continue, maybe_run_fn}}; handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) -> amoc_throttle_controller:telemetry_event(Name, request), {noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}}; @@ -212,6 +219,9 @@ maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) -> NewSchedule = lists:reverse(SchRev), NewState = State#state{schedule = NewSchedule, schedule_reversed = []}, maybe_run_fn(NewState); +maybe_run_fn(#state{interval = _, status = unlocked, n = N} = State) when N > 0 -> + NewState = run_fn(State), + maybe_run_fn(NewState); maybe_run_fn(#state{interval = 0, status = running, n = N} = State) when N > 0 -> NewState = run_fn(State), maybe_run_fn(NewState); diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index 2804a74a..6560cfe2 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -34,7 +34,7 @@ groups() -> async_runner_dies_while_waiting_raises_exit, async_runner_dies_when_throttler_dies, run_with_interval_zero_limits_only_number_of_parallel_executions, - pause_and_resume, + pause_and_resume_and_unlock, get_state ]} ]. @@ -286,7 +286,7 @@ run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> amoc_throttle:send(?FUNCTION_NAME, receive_this), ?assertMatch(ok, ?RECV(receive_this, 200)). -pause_and_resume(_) -> +pause_and_resume_and_unlock(_) -> %% Start 100-per-10ms throttle with a single process ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, @@ -299,7 +299,17 @@ pause_and_resume(_) -> ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)), %% After resume the message is then received ?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)), - ?assertMatch(ok, ?RECV(receive_this, 200)). + ?assertMatch(ok, ?RECV(receive_this, 200)), + %% If unlocked, all messages are always received + ?assertMatch(ok, amoc_throttle:unlock(?FUNCTION_NAME)), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + ?assertMatch(ok, ?RECV(receive_this, 200)), + %% From unlock it can resume + ?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)), + State = get_state_of_one_process(?FUNCTION_NAME), + ?assertMatch(#{name := ?FUNCTION_NAME, + delay_between_executions := 12}, + State). get_state(_) -> ?assertMatch({ok, started},