diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index faa4dd28e6b3..6dd2ae7d01cf 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -67,8 +67,8 @@ init() -> end, Ret = case rabbit_khepri:is_enabled() of - true -> init_using_khepri(); - false -> init_using_mnesia() + true -> init_using_khepri(IsVirgin); + false -> init_using_mnesia(IsVirgin) end, case Ret of ok -> @@ -91,7 +91,7 @@ pre_init(IsVirgin) -> OtherMembers = rabbit_nodes:nodes_excl_me(Members), rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin). -init_using_mnesia() -> +init_using_mnesia(_IsVirgin) -> ?LOG_DEBUG( "DB: initialize Mnesia", #{domain => ?RMQLOG_DOMAIN_DB}), @@ -99,11 +99,11 @@ init_using_mnesia() -> ?assertEqual(rabbit:data_dir(), mnesia_dir()), rabbit_sup:start_child(mnesia_sync). -init_using_khepri() -> +init_using_khepri(IsVirgin) -> ?LOG_DEBUG( "DB: initialize Khepri", #{domain => ?RMQLOG_DOMAIN_DB}), - rabbit_khepri:init(). + rabbit_khepri:init(IsVirgin). init_finished() -> %% Used during initialisation by rabbit_logger_exchange_h.erl diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index d2cece80fabc..942b3a648110 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -53,7 +53,7 @@ -define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route). -define(MNESIA_REVERSE_TABLE, rabbit_reverse_route). -define(MNESIA_INDEX_TABLE, rabbit_index_route). --define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_bindings). +-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding). -define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route). %% ------------------------------------------------------------------- @@ -835,9 +835,8 @@ delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = #if_has_data{}), - {ok, Bindings} = khepri_tx:get_many(Path), - ok = khepri_tx:delete_many(Path), - maps:fold(fun(_P, Set, Acc) -> + {ok, Bindings} = khepri_tx_adv:delete_many(Path), + maps:fold(fun(_P, #{data := Set}, Acc) -> sets:to_list(Set) ++ Acc end, [], Bindings). @@ -881,25 +880,20 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) -> OnlyDurable :: boolean(), Deletions :: rabbit_binding:deletions(). -delete_for_destination_in_khepri(DstName, OnlyDurable) -> - BindingsMap = match_destination_in_khepri(DstName), - maps:foreach(fun(K, _V) -> khepri_tx:delete(K) end, BindingsMap), - Bindings = maps:fold(fun(_, Set, Acc) -> +delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) -> + Pattern = khepri_route_path( + VHost, + _SrcName = ?KHEPRI_WILDCARD_STAR, + Kind, + Name, + _RoutingKey = ?KHEPRI_WILDCARD_STAR), + {ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern), + Bindings = maps:fold(fun(_, #{data := Set}, Acc) -> sets:to_list(Set) ++ Acc end, [], BindingsMap), rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4, lists:keysort(#binding.source, Bindings), OnlyDurable). -match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}) -> - Path = khepri_route_path( - VHost, - _SrcName = ?KHEPRI_WILDCARD_STAR, - Kind, - Name, - _RoutingKey = ?KHEPRI_WILDCARD_STAR), - {ok, Map} = khepri_tx:get_many(Path), - Map. - %% ------------------------------------------------------------------- %% delete_transient_for_destination_in_mnesia(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 326534385bc5..f8c37a22428f 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -26,6 +26,7 @@ peek_serial/1, next_serial/1, delete/2, + delete_all/1, delete_serial/1, recover/1, match/1, @@ -657,6 +658,69 @@ delete_in_khepri(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour ok = khepri_tx:delete(khepri_exchange_path(XName)), rabbit_db_binding:delete_all_for_exchange_in_khepri(X, OnlyDurable, RemoveBindingsForSource). +%% ------------------------------------------------------------------- +%% delete_all(). +%% ------------------------------------------------------------------- + +-spec delete_all(VHostName) -> Ret when + VHostName :: vhost:name(), + Deletions :: rabbit_binding:deletions(), + Ret :: {ok, Deletions}. +%% @doc Deletes all exchanges for a given vhost. +%% +%% @returns an `{ok, Deletions}' tuple containing the {@link +%% rabbit_binding:deletions()} caused by deleting the exchanges under the given +%% vhost. +%% +%% @private + +delete_all(VHostName) -> + rabbit_khepri:handle_fallback( + #{mnesia => fun() -> delete_all_in_mnesia(VHostName) end, + khepri => fun() -> delete_all_in_khepri(VHostName) end + }). + +delete_all_in_mnesia(VHostName) -> + rabbit_mnesia:execute_mnesia_transaction( + fun() -> + delete_all_in_mnesia_tx(VHostName) + end). + +delete_all_in_mnesia_tx(VHostName) -> + Match = #exchange{name = rabbit_misc:r(VHostName, exchange), _ = '_'}, + Xs = mnesia:match_object(?MNESIA_TABLE, Match, write), + Deletions = + lists:foldl( + fun(X, Acc) -> + {deleted, #exchange{name = XName}, Bindings, XDeletions} = + unconditional_delete_in_mnesia( X, false), + XDeletions1 = rabbit_binding:add_deletion( + XName, {X, deleted, Bindings}, XDeletions), + rabbit_binding:combine_deletions(Acc, XDeletions1) + end, rabbit_binding:new_deletions(), Xs), + {ok, Deletions}. + +delete_all_in_khepri(VHostName) -> + rabbit_khepri:transaction( + fun() -> + delete_all_in_khepri_tx(VHostName) + end, rw, #{timeout => infinity}). + +delete_all_in_khepri_tx(VHostName) -> + Pattern = khepri_exchange_path(VHostName, ?KHEPRI_WILDCARD_STAR), + {ok, NodeProps} = khepri_tx_adv:delete_many(Pattern), + Deletions = + maps:fold( + fun(_Path, #{data := X}, Deletions) -> + {deleted, #exchange{name = XName}, Bindings, XDeletions} = + rabbit_db_binding:delete_all_for_exchange_in_khepri( + X, false, true), + Deletions1 = rabbit_binding:add_deletion( + XName, {X, deleted, Bindings}, XDeletions), + rabbit_binding:combine_deletions(Deletions, Deletions1) + end, rabbit_binding:new_deletions(), NodeProps), + {ok, Deletions}. + %% ------------------------------------------------------------------- %% delete_serial(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_rtparams.erl b/deps/rabbit/src/rabbit_db_rtparams.erl index d241c72e540e..f57642ee953b 100644 --- a/deps/rabbit/src/rabbit_db_rtparams.erl +++ b/deps/rabbit/src/rabbit_db_rtparams.erl @@ -23,8 +23,8 @@ ]). -define(MNESIA_TABLE, rabbit_runtime_parameters). --define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparams). --define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparams). +-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparam). +-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparam). -define(any(Value), case Value of '_' -> ?KHEPRI_WILDCARD_STAR; _ -> Value diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl index a717e69337b3..af72080be9c1 100644 --- a/deps/rabbit/src/rabbit_db_user.erl +++ b/deps/rabbit/src/rabbit_db_user.erl @@ -75,8 +75,8 @@ -define(MNESIA_TABLE, rabbit_user). -define(PERM_MNESIA_TABLE, rabbit_user_permission). -define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission). --define(KHEPRI_USERS_PROJECTION, rabbit_khepri_users). --define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permissions). +-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_user). +-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permission). %% ------------------------------------------------------------------- %% create(). diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index 5a00d4de80da..b4037f9a8078 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -14,7 +14,7 @@ update_scratch/3, update_decorators/2, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, route/2, route/3, delete/3, validate_binding/2, count/0, - ensure_deleted/3]). + ensure_deleted/3, delete_all/2]). -export([list_names/0]). -export([serialise_events/1]). -export([serial/1, peek_serial/1]). @@ -484,6 +484,17 @@ delete(XName, IfUnused, Username) -> XName#resource.name, Username) end. +-spec delete_all(VHostName, ActingUser) -> Ret when + VHostName :: vhost:name(), + ActingUser :: rabbit_types:username(), + Ret :: ok. + +delete_all(VHostName, ActingUser) -> + {ok, Deletions} = rabbit_db_exchange:delete_all(VHostName), + Deletions1 = rabbit_binding:process_deletions(Deletions), + rabbit_binding:notify_deletions(Deletions1, ActingUser), + ok. + process_deletions({error, _} = E) -> E; process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) -> diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 913b4de80d5f..d8f35e990fba 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -87,6 +87,8 @@ -module(rabbit_khepri). +-feature(maybe_expr, enable). + -include_lib("kernel/include/logger.hrl"). -include_lib("stdlib/include/assert.hrl"). @@ -98,7 +100,8 @@ -export([setup/0, setup/1, - init/0, + register_projections/0, + init/1, can_join_cluster/1, add_member/2, remove_member/1, @@ -267,7 +270,6 @@ setup(_) -> RetryTimeout = retry_timeout(), case khepri_cluster:wait_for_leader(?STORE_ID, RetryTimeout) of ok -> - wait_for_register_projections(), ?LOG_DEBUG( "Khepri-based " ?RA_FRIENDLY_NAME " ready", #{domain => ?RMQLOG_DOMAIN_GLOBAL}), @@ -287,49 +289,46 @@ retry_timeout() -> undefined -> 30000 end. -retry_limit() -> - case application:get_env(rabbit, khepri_leader_wait_retry_limit) of - {ok, T} -> T; - undefined -> 10 - end. - -wait_for_register_projections() -> - wait_for_register_projections(retry_timeout(), retry_limit()). - -wait_for_register_projections(_Timeout, 0) -> - exit(timeout_waiting_for_khepri_projections); -wait_for_register_projections(Timeout, Retries) -> - rabbit_log:info("Waiting for Khepri projections for ~tp ms, ~tp retries left", - [Timeout, Retries - 1]), - try - register_projections() - catch - throw : timeout -> - wait_for_register_projections(Timeout, Retries -1) - end. - %% @private --spec init() -> Ret when +-spec init(IsVirgin) -> Ret when + IsVirgin :: boolean(), Ret :: ok | timeout_error(). -init() -> +init(IsVirgin) -> case members() of [] -> timer:sleep(1000), - init(); + init(IsVirgin); Members -> ?LOG_NOTICE( "Found the following metadata store members: ~p", [Members], #{domain => ?RMQLOG_DOMAIN_DB}), - %% Delete transient queues on init. - %% Note that we also do this in the - %% `rabbit_amqqueue:on_node_down/1' callback. We must try this - %% deletion during init because the cluster may have been in a - %% minority when this node went down. We wait for a majority while - %% booting (via `rabbit_khepri:setup/0') though so this deletion is - %% likely to succeed. - rabbit_amqqueue:delete_transient_queues_on_node(node()) + maybe + ?LOG_DEBUG( + "Khepri-based " ?RA_FRIENDLY_NAME " catching up on " + "replication to the Raft cluster leader", [], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= fence(retry_timeout()), + ?LOG_DEBUG( + "local Khepri-based " ?RA_FRIENDLY_NAME " member is caught " + "up to the Raft cluster leader", [], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= case IsVirgin of + true -> + register_projections(); + false -> + ok + end, + %% Delete transient queues on init. + %% Note that we also do this in the + %% `rabbit_amqqueue:on_node_down/1' callback. We must try this + %% deletion during init because the cluster may have been in a + %% minority when this node went down. We wait for a majority + %% while registering projections above though so this deletion + %% is likely to succeed. + rabbit_amqqueue:delete_transient_queues_on_node(node()) + end end. %% @private @@ -1063,6 +1062,9 @@ info() -> handle_async_ret(RaEvent) -> khepri:handle_async_ret(?STORE_ID, RaEvent). +fence(Timeout) -> + khepri:fence(?STORE_ID, Timeout). + %% ------------------------------------------------------------------- %% collect_payloads(). %% ------------------------------------------------------------------- @@ -1105,6 +1107,36 @@ collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) -> Acc end, Acc0, Props). +-spec unregister_legacy_projections() -> Ret when + Ret :: ok | timeout_error(). +%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x +%% versions. +%% +%% In 3.13.x until 3.13.8 we mistakenly registered these projections even if +%% Khepri was not enabled. This function is used by the `khepri_db' enable +%% callback to remove those projections before we register the ones necessary +%% for 4.0.x. +%% +%% @private + +unregister_legacy_projections() -> + %% Note that we don't use `all' since `khepri_mnesia_migration' also + %% creates a projection table which we don't want to unregister. Instead + %% we list all of the legacy projection names: + LegacyNames = [ + rabbit_khepri_exchange, + rabbit_khepri_queue, + rabbit_khepri_vhost, + rabbit_khepri_users, + rabbit_khepri_global_rtparams, + rabbit_khepri_per_vhost_rtparams, + rabbit_khepri_user_permissions, + rabbit_khepri_bindings, + rabbit_khepri_index_route, + rabbit_khepri_topic_trie + ], + khepri:unregister_projections(?STORE_ID, LegacyNames). + register_projections() -> RegFuns = [fun register_rabbit_exchange_projection/0, fun register_rabbit_queue_projection/0, @@ -1116,20 +1148,23 @@ register_projections() -> fun register_rabbit_bindings_projection/0, fun register_rabbit_index_route_projection/0, fun register_rabbit_topic_graph_projection/0], - [case RegisterFun() of - ok -> - ok; - %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` would - %% return `{error, exists}` for projections which already exist. - {error, exists} -> - ok; - %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. - {error, {khepri, projection_already_exists, _Info}} -> - ok; - {error, Error} -> - throw(Error) - end || RegisterFun <- RegFuns], - ok. + rabbit_misc:for_each_while_ok( + fun(RegisterFun) -> + case RegisterFun() of + ok -> + ok; + %% Before Khepri v0.13.0, `khepri:register_projection/1,2,3` + %% would return `{error, exists}` for projections which + %% already exist. + {error, exists} -> + ok; + %% In v0.13.0+, Khepri returns a `?khepri_error(..)` instead. + {error, {khepri, projection_already_exists, _Info}} -> + ok; + {error, _} = Error -> + Error + end + end, RegFuns). register_rabbit_exchange_projection() -> Name = rabbit_khepri_exchange, @@ -1155,21 +1190,21 @@ register_rabbit_vhost_projection() -> register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_users_projection() -> - Name = rabbit_khepri_users, + Name = rabbit_khepri_user, PathPattern = rabbit_db_user:khepri_user_path( _UserName = ?KHEPRI_WILDCARD_STAR), KeyPos = 2, %% #internal_user.username register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_global_runtime_parameters_projection() -> - Name = rabbit_khepri_global_rtparams, + Name = rabbit_khepri_global_rtparam, PathPattern = rabbit_db_rtparams:khepri_global_rp_path( _Key = ?KHEPRI_WILDCARD_STAR_STAR), KeyPos = #runtime_parameters.key, register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_per_vhost_runtime_parameters_projection() -> - Name = rabbit_khepri_per_vhost_rtparams, + Name = rabbit_khepri_per_vhost_rtparam, PathPattern = rabbit_db_rtparams:khepri_vhost_rp_path( _VHost = ?KHEPRI_WILDCARD_STAR_STAR, _Component = ?KHEPRI_WILDCARD_STAR_STAR, @@ -1178,7 +1213,7 @@ register_rabbit_per_vhost_runtime_parameters_projection() -> register_simple_projection(Name, PathPattern, KeyPos). register_rabbit_user_permissions_projection() -> - Name = rabbit_khepri_user_permissions, + Name = rabbit_khepri_user_permission, PathPattern = rabbit_db_user:khepri_user_permission_path( _UserName = ?KHEPRI_WILDCARD_STAR, _VHost = ?KHEPRI_WILDCARD_STAR), @@ -1188,7 +1223,7 @@ register_rabbit_user_permissions_projection() -> register_simple_projection(Name, PathPattern, KeyPos) -> Options = #{keypos => KeyPos}, Projection = khepri_projection:new(Name, copy, Options), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_bindings_projection() -> MapFun = fun(_Path, Binding) -> @@ -1197,14 +1232,14 @@ register_rabbit_bindings_projection() -> ProjectionFun = projection_fun_for_sets(MapFun), Options = #{keypos => #route.binding}, Projection = khepri_projection:new( - rabbit_khepri_bindings, ProjectionFun, Options), + rabbit_khepri_binding, ProjectionFun, Options), PathPattern = rabbit_db_binding:khepri_route_path( _VHost = ?KHEPRI_WILDCARD_STAR, _ExchangeName = ?KHEPRI_WILDCARD_STAR, _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). register_rabbit_index_route_projection() -> MapFun = fun(Path, _) -> @@ -1236,7 +1271,7 @@ register_rabbit_index_route_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). %% Routing information is stored in the Khepri store as a `set'. %% In order to turn these bindings into records in an ETS `bag', we use a @@ -1337,7 +1372,7 @@ register_rabbit_topic_graph_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), - khepri:register_projection(?RA_CLUSTER_NAME, PathPattern, Projection). + khepri:register_projection(?STORE_ID, PathPattern, Projection). -spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when Table :: ets:tid(), @@ -1515,9 +1550,19 @@ get_feature_state(Node) -> %% @private khepri_db_migration_enable(#{feature_name := FeatureName}) -> - case sync_cluster_membership_from_mnesia(FeatureName) of - ok -> migrate_mnesia_tables(FeatureName); - Error -> Error + maybe + ok ?= sync_cluster_membership_from_mnesia(FeatureName), + ?LOG_INFO( + "Feature flag `~s`: unregistering legacy projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= unregister_legacy_projections(), + ?LOG_INFO( + "Feature flag `~s`: registering projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= register_projections(), + migrate_mnesia_tables(FeatureName) end. %% @private diff --git a/deps/rabbit/src/rabbit_vhost.erl b/deps/rabbit/src/rabbit_vhost.erl index 4da8fe1d6785..00c148e275ea 100644 --- a/deps/rabbit/src/rabbit_vhost.erl +++ b/deps/rabbit/src/rabbit_vhost.erl @@ -299,8 +299,7 @@ delete(VHost, ActingUser) -> assert_benign(rabbit_amqqueue:with(Name, QDelFun), ActingUser) end || Q <- rabbit_amqqueue:list(VHost)], rabbit_log:info("Deleting exchanges in vhost '~ts' because it's being deleted", [VHost]), - [ok = rabbit_exchange:ensure_deleted(Name, false, ActingUser) || - #exchange{name = Name} <- rabbit_exchange:list(VHost)], + ok = rabbit_exchange:delete_all(VHost, ActingUser), rabbit_log:info("Clearing policies and runtime parameters in vhost '~ts' because it's being deleted", [VHost]), _ = rabbit_runtime_parameters:clear_vhost(VHost, ActingUser), rabbit_log:debug("Removing vhost '~ts' from the metadata storage because it's being deleted", [VHost]), diff --git a/deps/rabbit/test/metadata_store_phase1_SUITE.erl b/deps/rabbit/test/metadata_store_phase1_SUITE.erl index 7e50445820f0..cf080d170ce1 100644 --- a/deps/rabbit/test/metadata_store_phase1_SUITE.erl +++ b/deps/rabbit/test/metadata_store_phase1_SUITE.erl @@ -192,6 +192,7 @@ setup_khepri(Config) -> %% Configure Khepri. It takes care of configuring Ra system & cluster. It %% uses the Mnesia directory to store files. ok = rabbit_khepri:setup(undefined), + ok = rabbit_khepri:register_projections(), ct:pal("Khepri info below:"), rabbit_khepri:info(), diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index c230b63cf3a5..f3d556595802 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -992,7 +992,7 @@ enable_khepri_metadata_store(Config, FFs0) -> case enable_feature_flag(C, FF) of ok -> C; - Skip -> + {skip, _} = Skip -> ct:pal("Enabling metadata store failed: ~p", [Skip]), Skip end diff --git a/deps/rabbitmq_event_exchange/test/system_SUITE.erl b/deps/rabbitmq_event_exchange/test/system_SUITE.erl index 3cd01a79e852..76d9199a586c 100644 --- a/deps/rabbitmq_event_exchange/test/system_SUITE.erl +++ b/deps/rabbitmq_event_exchange/test/system_SUITE.erl @@ -21,6 +21,7 @@ all() -> authentication, audit_queue, audit_exchange, + audit_exchange_internal_parameter, audit_binding, audit_vhost, audit_vhost_deletion, @@ -28,7 +29,6 @@ all() -> audit_connection, audit_direct_connection, audit_consumer, - audit_vhost_internal_parameter, audit_parameter, audit_policy, audit_vhost_limit, @@ -272,13 +272,19 @@ audit_consumer(Config) -> rabbit_ct_client_helpers:close_channel(Ch), ok. -audit_vhost_internal_parameter(Config) -> +audit_exchange_internal_parameter(Config) -> Ch = declare_event_queue(Config, <<"parameter.*">>), - User = <<"Bugs Bunny">>, - Vhost = <<"test-vhost">>, - rabbit_ct_broker_helpers:add_vhost(Config, 0, Vhost, User), - rabbit_ct_broker_helpers:delete_vhost(Config, 0, Vhost, User), + X = <<"exchange.audited-for-parameters">>, + #'exchange.declare_ok'{} = + amqp_channel:call(Ch, #'exchange.declare'{exchange = X, + type = <<"topic">>}), + #'exchange.delete_ok'{} = + amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), + + User = proplists:get_value(rmq_username, Config), + %% Exchange deletion sets and clears a runtime parameter which acts as a + %% kind of lock: receive_user_in_event(<<"parameter.set">>, User), receive_user_in_event(<<"parameter.cleared">>, User),