Skip to content

Commit

Permalink
Handle rabbit_amqqueue:internal_delete/2 failures in quorum queues
Browse files Browse the repository at this point in the history
(cherry picked from commit 4a8d01e)
  • Loading branch information
the-mikedavis authored and mergify[bot] committed Aug 27, 2024
1 parent aa778aa commit c9ad0cd
Showing 1 changed file with 26 additions and 11 deletions.
37 changes: 26 additions & 11 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -809,19 +809,24 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
ok = force_delete_queue(Servers)
end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(Q, ActingUser),
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
case delete_queue_data(Q, ActingUser) of
ok ->
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete queue ~ts from the metadata "
"store timed out", [rabbit_misc:rs(QName)]}
end;
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
end, Errs) of
true ->
%% If all ra nodes were already down, the delete
%% has succeed
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs};
ok;
false ->
%% attempt forced deletion of all servers
rabbit_log:warning(
Expand All @@ -830,9 +835,15 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
" Attempting force delete.",
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(Q, ActingUser),
{ok, ReadyMsgs}
notify_decorators(QName, shutdown)
end,
case delete_queue_data(Q, ActingUser) of
ok ->
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete queue ~ts from the metadata "
"store timed out", [rabbit_misc:rs(QName)]}
end
end.

Expand All @@ -850,9 +861,13 @@ force_delete_queue(Servers) ->
end || S <- Servers],
ok.

-spec delete_queue_data(Queue, ActingUser) -> Ret when
Queue :: amqqueue:amqqueue(),
ActingUser :: rabbit_types:username(),
Ret :: ok | {error, timeout}.

delete_queue_data(Queue, ActingUser) ->
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
ok.
rabbit_amqqueue:internal_delete(Queue, ActingUser).


delete_immediately(Queue) ->
Expand Down

0 comments on commit c9ad0cd

Please sign in to comment.