Skip to content

Commit

Permalink
Merge pull request #12130 from rabbitmq/mergify/bp/v4.0.x/pr-12082
Browse files Browse the repository at this point in the history
  • Loading branch information
the-mikedavis authored Aug 27, 2024
2 parents 7aef123 + c9ad0cd commit 710e83f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 30 deletions.
12 changes: 11 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1669,6 +1669,11 @@ delete_with(QueueName, ConnPid, IfUnused, IfEmpty, Username, CheckExclusive) whe
{error, {exit, _, _}} ->
%% delete()/delegate:invoke might return {error, {exit, _, _}}
{ok, 0};
{error, timeout} ->
rabbit_misc:protocol_error(
internal_error,
"The operation to delete the queue from the metadata store "
"timed out", []);
{ok, Count} ->
{ok, Count};
{protocol_error, Type, Reason, ReasonArgs} ->
Expand Down Expand Up @@ -1777,7 +1782,10 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast,
[{resume, ChPid}]}).

-spec internal_delete(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'.
-spec internal_delete(Queue, ActingUser) -> Ret when
Queue :: amqqueue:amqqueue(),
ActingUser :: rabbit_types:username(),
Ret :: ok | {error, timeout}.

internal_delete(Queue, ActingUser) ->
internal_delete(Queue, ActingUser, normal).
Expand All @@ -1787,6 +1795,8 @@ internal_delete(Queue, ActingUser, Reason) ->
case rabbit_db_queue:delete(QueueName, Reason) of
ok ->
ok;
{error, timeout} = Err ->
Err;
Deletions ->
_ = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
Expand Down
40 changes: 25 additions & 15 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
end, State);
terminate({shutdown, missing_owner = Reason}, {{reply_to, From}, #q{q = Q} = State}) ->
%% if the owner was missing then there will be no queue, so don't emit stats
State1 = terminate_shutdown(terminate_delete(false, Reason, State), State),
State1 = terminate_shutdown(terminate_delete(false, Reason, none, State), State),
send_reply(From, {owner_died, Q}),
State1;
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
Expand All @@ -310,18 +310,22 @@ terminate(normal, State = #q{status = {terminated_by, auto_delete}}) ->
%% thousands of queues. A optimisation introduced by server#1513
%% needs to be reverted by this case, avoiding to guard the delete
%% operation on `rabbit_durable_queue`
terminate_shutdown(terminate_delete(true, auto_delete, State), State);
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
terminate_shutdown(terminate_delete(true, auto_delete, none, State), State);
terminate(normal, {{reply_to, ReplyTo}, State}) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, ReplyTo, State), State);
terminate(normal, State) ->
terminate_shutdown(terminate_delete(true, normal, none, State), State);
%% If we crashed don't try to clean up the BQS, probably best to leave it.
terminate(_Reason, State = #q{q = Q}) ->
terminate_shutdown(fun (BQS) ->
Q2 = amqqueue:set_state(Q, crashed),
%% When mnesia is removed this update can become
%% an async Khepri command.
_ = rabbit_amqqueue:store_queue(Q2),
BQS
end, State).

terminate_delete(EmitStats, Reason0,
terminate_delete(EmitStats, Reason0, ReplyTo,
State = #q{q = Q,
backing_queue = BQ,
status = Status}) ->
Expand All @@ -332,19 +336,24 @@ terminate_delete(EmitStats, Reason0,
missing_owner -> normal;
Any -> Any
end,
Len = BQ:len(BQS),
BQS1 = BQ:delete_and_terminate(Reason, BQS),
if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
fun() -> emit_stats(State) end);
true -> ok
end,
%% This try-catch block transforms throws to errors since throws are not
%% logged.
try
%% don't care if the internal delete doesn't return 'ok'.
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0)
catch
{error, ReasonE} -> error(ReasonE)
end,
%% logged. When mnesia is removed this `try` can be removed: Khepri
%% returns errors as error tuples instead.
Reply = try rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0) of
ok ->
{ok, Len};
{error, _} = Err ->
Err
catch
{error, ReasonE} -> error(ReasonE)
end,
send_reply(ReplyTo, Reply),
BQS1
end.

Expand Down Expand Up @@ -1396,15 +1405,16 @@ handle_call(stat, _From, State) ->
ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), rabbit_queue_consumers:count()}, State1);

handle_call({delete, IfUnused, IfEmpty, ActingUser}, _From,
handle_call({delete, IfUnused, IfEmpty, ActingUser}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
IfUnused and not(IsUnused) -> reply({error, in_use}, State);
true -> stop({ok, BQ:len(BQS)},
State#q{status = {terminated_by, ActingUser}})
true ->
State1 = State#q{status = {terminated_by, ActingUser}},
stop({{reply_to, From}, State1})
end;

handle_call(purge, _From, State = #q{backing_queue = BQ,
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,9 @@ list_for_count_in_khepri(VHostName) ->
-spec delete(QName, Reason) -> Ret when
QName :: rabbit_amqqueue:name(),
Reason :: atom(),
Ret :: ok | Deletions :: rabbit_binding:deletions().
Ret :: ok |
Deletions :: rabbit_binding:deletions() |
rabbit_khepri:timeout_error().

delete(QueueName, Reason) ->
rabbit_khepri:handle_fallback(
Expand Down
37 changes: 26 additions & 11 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -809,19 +809,24 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(Q, ActingUser),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
case delete_queue_data(Q, ActingUser) of
ok ->
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete queue ~ts from the metadata "
"store timed out", [rabbit_misc:rs(QName)]}
end;
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs};
ok;
false ->
%% attempt forced deletion of all servers
rabbit_log:warning(
Expand All @@ -830,9 +835,15 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs}
notify_decorators(QName, shutdown)
end,
case delete_queue_data(Q, ActingUser) of
ok ->
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete queue ~ts from the metadata "
"store timed out", [rabbit_misc:rs(QName)]}
end
end.

Expand All @@ -850,9 +861,13 @@ force_delete_queue(Servers) ->
end || S <- Servers],
ok.

-spec delete_queue_data(Queue, ActingUser) -> Ret when
Queue :: amqqueue:amqqueue(),
ActingUser :: rabbit_types:username(),
Ret :: ok | {error, timeout}.

delete_queue_data(Queue, ActingUser) ->
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
ok.
rabbit_amqqueue:internal_delete(Queue, ActingUser).


delete_immediately(Queue) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
{ok, Reply} ->
Reply;
Error ->
{protocol_error, internal_error, "Cannot delete queue '~ts' on node '~ts': ~255p ",
{protocol_error, internal_error, "Cannot delete ~ts on node '~ts': ~255p ",
[rabbit_misc:rs(amqqueue:get_name(Q)), node(), Error]}
end.

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ assert_benign({error, not_found}, _) -> ok;
assert_benign({error, {absent, Q, _}}, ActingUser) ->
%% Removing the database entries here is safe. If/when the down node
%% restarts, it will clear out the on-disk storage of the queue.
rabbit_amqqueue:internal_delete(Q, ActingUser).
ok = rabbit_amqqueue:internal_delete(Q, ActingUser).

-spec exists(vhost:name()) -> boolean().

Expand Down
27 changes: 27 additions & 0 deletions deps/rabbit/test/cluster_minority_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ groups() ->
declare_binding,
delete_binding,
declare_queue,
delete_queue,
publish_to_exchange,
publish_and_consume_to_local_classic_queue,
consume_from_queue,
Expand Down Expand Up @@ -97,6 +98,16 @@ init_per_group(Group, Config0) when Group == client_operations;
%% To be used in consume_from_queue
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue">>,
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
%% To be used in consume_from_queue
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue-delete-classic">>,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}]}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue-delete-stream">>,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue-delete-quorum">>,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]}),
%% To be used in delete_binding
#'exchange.bind_ok'{} = amqp_channel:call(Ch, #'exchange.bind'{destination = <<"amq.fanout">>,
source = <<"amq.direct">>,
Expand Down Expand Up @@ -188,6 +199,22 @@ declare_queue(Config) ->
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'queue.declare'{queue = <<"test-queue-2">>})).

delete_queue(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
{ok, Ch1} = amqp_connection:open_channel(Conn1),
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch1, #'queue.delete'{queue = <<"test-queue-delete-classic">>})),
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
{ok, Ch2} = amqp_connection:open_channel(Conn2),
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch2, #'queue.delete'{queue = <<"test-queue-delete-stream">>})),
Conn3 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, A),
{ok, Ch3} = amqp_connection:open_channel(Conn3),
?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch3, #'queue.delete'{queue = <<"test-queue-delete-quorum">>})),
ok.

publish_to_exchange(Config) ->
[A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
Expand Down

0 comments on commit 710e83f

Please sign in to comment.