Skip to content

Commit

Permalink
Merge pull request #12273 from rabbitmq/md/bp/v4.0.x/khepri-prs
Browse files Browse the repository at this point in the history
  • Loading branch information
the-mikedavis authored Sep 10, 2024
2 parents 067b038 + 2e7f149 commit ecdf04d
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 97 deletions.
10 changes: 5 additions & 5 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ init() ->
end,

Ret = case rabbit_khepri:is_enabled() of
true -> init_using_khepri();
false -> init_using_mnesia()
true -> init_using_khepri(IsVirgin);
false -> init_using_mnesia(IsVirgin)
end,
case Ret of
ok ->
Expand All @@ -91,19 +91,19 @@ pre_init(IsVirgin) ->
OtherMembers = rabbit_nodes:nodes_excl_me(Members),
rabbit_db_cluster:ensure_feature_flags_are_in_sync(OtherMembers, IsVirgin).

init_using_mnesia() ->
init_using_mnesia(_IsVirgin) ->
?LOG_DEBUG(
"DB: initialize Mnesia",
#{domain => ?RMQLOG_DOMAIN_DB}),
ok = rabbit_mnesia:init(),
?assertEqual(rabbit:data_dir(), mnesia_dir()),
rabbit_sup:start_child(mnesia_sync).

init_using_khepri() ->
init_using_khepri(IsVirgin) ->
?LOG_DEBUG(
"DB: initialize Khepri",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_khepri:init().
rabbit_khepri:init(IsVirgin).

init_finished() ->
%% Used during initialisation by rabbit_logger_exchange_h.erl
Expand Down
30 changes: 12 additions & 18 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
-define(MNESIA_SEMI_DURABLE_TABLE, rabbit_semi_durable_route).
-define(MNESIA_REVERSE_TABLE, rabbit_reverse_route).
-define(MNESIA_INDEX_TABLE, rabbit_index_route).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_bindings).
-define(KHEPRI_BINDINGS_PROJECTION, rabbit_khepri_binding).
-define(KHEPRI_INDEX_ROUTE_PROJECTION, rabbit_khepri_index_route).

%% -------------------------------------------------------------------
Expand Down Expand Up @@ -835,9 +835,8 @@ delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = #if_has_data{}),
{ok, Bindings} = khepri_tx:get_many(Path),
ok = khepri_tx:delete_many(Path),
maps:fold(fun(_P, Set, Acc) ->
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
maps:fold(fun(_P, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], Bindings).

Expand Down Expand Up @@ -881,25 +880,20 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
OnlyDurable :: boolean(),
Deletions :: rabbit_binding:deletions().

delete_for_destination_in_khepri(DstName, OnlyDurable) ->
BindingsMap = match_destination_in_khepri(DstName),
maps:foreach(fun(K, _V) -> khepri_tx:delete(K) end, BindingsMap),
Bindings = maps:fold(fun(_, Set, Acc) ->
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
Pattern = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], BindingsMap),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

match_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}) ->
Path = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
{ok, Map} = khepri_tx:get_many(Path),
Map.

%% -------------------------------------------------------------------
%% delete_transient_for_destination_in_mnesia().
%% -------------------------------------------------------------------
Expand Down
64 changes: 64 additions & 0 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
peek_serial/1,
next_serial/1,
delete/2,
delete_all/1,
delete_serial/1,
recover/1,
match/1,
Expand Down Expand Up @@ -657,6 +658,69 @@ delete_in_khepri(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSour
ok = khepri_tx:delete(khepri_exchange_path(XName)),
rabbit_db_binding:delete_all_for_exchange_in_khepri(X, OnlyDurable, RemoveBindingsForSource).

%% -------------------------------------------------------------------
%% delete_all().
%% -------------------------------------------------------------------

-spec delete_all(VHostName) -> Ret when
VHostName :: vhost:name(),
Deletions :: rabbit_binding:deletions(),
Ret :: {ok, Deletions}.
%% @doc Deletes all exchanges for a given vhost.
%%
%% @returns an `{ok, Deletions}' tuple containing the {@link
%% rabbit_binding:deletions()} caused by deleting the exchanges under the given
%% vhost.
%%
%% @private

delete_all(VHostName) ->
rabbit_khepri:handle_fallback(
#{mnesia => fun() -> delete_all_in_mnesia(VHostName) end,
khepri => fun() -> delete_all_in_khepri(VHostName) end
}).

delete_all_in_mnesia(VHostName) ->
rabbit_mnesia:execute_mnesia_transaction(
fun() ->
delete_all_in_mnesia_tx(VHostName)
end).

delete_all_in_mnesia_tx(VHostName) ->
Match = #exchange{name = rabbit_misc:r(VHostName, exchange), _ = '_'},
Xs = mnesia:match_object(?MNESIA_TABLE, Match, write),
Deletions =
lists:foldl(
fun(X, Acc) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
unconditional_delete_in_mnesia( X, false),
XDeletions1 = rabbit_binding:add_deletion(
XName, {X, deleted, Bindings}, XDeletions),
rabbit_binding:combine_deletions(Acc, XDeletions1)
end, rabbit_binding:new_deletions(), Xs),
{ok, Deletions}.

delete_all_in_khepri(VHostName) ->
rabbit_khepri:transaction(
fun() ->
delete_all_in_khepri_tx(VHostName)
end, rw, #{timeout => infinity}).

delete_all_in_khepri_tx(VHostName) ->
Pattern = khepri_exchange_path(VHostName, ?KHEPRI_WILDCARD_STAR),
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
Deletions =
maps:fold(
fun(_Path, #{data := X}, Deletions) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, {X, deleted, Bindings}, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1)
end, rabbit_binding:new_deletions(), NodeProps),
{ok, Deletions}.

%% -------------------------------------------------------------------
%% delete_serial().
%% -------------------------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
]).

-define(MNESIA_TABLE, rabbit_runtime_parameters).
-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparams).
-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparams).
-define(KHEPRI_GLOBAL_PROJECTION, rabbit_khepri_global_rtparam).
-define(KHEPRI_VHOST_PROJECTION, rabbit_khepri_per_vhost_rtparam).
-define(any(Value), case Value of
'_' -> ?KHEPRI_WILDCARD_STAR;
_ -> Value
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
-define(MNESIA_TABLE, rabbit_user).
-define(PERM_MNESIA_TABLE, rabbit_user_permission).
-define(TOPIC_PERM_MNESIA_TABLE, rabbit_topic_permission).
-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_users).
-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permissions).
-define(KHEPRI_USERS_PROJECTION, rabbit_khepri_user).
-define(KHEPRI_PERMISSIONS_PROJECTION, rabbit_khepri_user_permission).

%% -------------------------------------------------------------------
%% create().
Expand Down
13 changes: 12 additions & 1 deletion deps/rabbit/src/rabbit_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
update_scratch/3, update_decorators/2, immutable/1,
info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4,
route/2, route/3, delete/3, validate_binding/2, count/0,
ensure_deleted/3]).
ensure_deleted/3, delete_all/2]).
-export([list_names/0]).
-export([serialise_events/1]).
-export([serial/1, peek_serial/1]).
Expand Down Expand Up @@ -484,6 +484,17 @@ delete(XName, IfUnused, Username) ->
XName#resource.name, Username)
end.

-spec delete_all(VHostName, ActingUser) -> Ret when
VHostName :: vhost:name(),
ActingUser :: rabbit_types:username(),
Ret :: ok.

delete_all(VHostName, ActingUser) ->
{ok, Deletions} = rabbit_db_exchange:delete_all(VHostName),
Deletions1 = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions1, ActingUser),
ok.

process_deletions({error, _} = E) ->
E;
process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
Expand Down
Loading

0 comments on commit ecdf04d

Please sign in to comment.