From 8c046c71c84d0aaff38c7e3ebb603b8453e99215 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 24 Oct 2024 09:25:49 +0000 Subject: [PATCH 1/7] Fix test flake As described in https://github.com/rabbitmq/rabbitmq-server/issues/12413#issuecomment-2385379386 test case queue_topology flaked in CI with the following error: ``` rabbitmq_amqp_client > management_SUITE > cluster_size_3 > queue_topology #1. {error,{test_case_failed,{824, <<"rmq-ct-cluster_size_3-1-21000@localhost">>}}} ``` This flake could not be reproduced locally (neither with Mnesia nor with Khepri). --- .../test/management_SUITE.erl | 43 +++++++++++-------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/deps/rabbitmq_amqp_client/test/management_SUITE.erl b/deps/rabbitmq_amqp_client/test/management_SUITE.erl index 4926f13c8c92..3431ddecd8aa 100644 --- a/deps/rabbitmq_amqp_client/test/management_SUITE.erl +++ b/deps/rabbitmq_amqp_client/test/management_SUITE.erl @@ -810,25 +810,30 @@ queue_topology(Config) -> ok = rabbit_ct_broker_helpers:stop_node(Config, 0), Init2 = {_, LinkPair2} = init(Config, 2), - {ok, QQInfo2} = rabbitmq_amqp_client:get_queue(LinkPair2, QQName), - {ok, SQInfo2} = rabbitmq_amqp_client:get_queue(LinkPair2, SQName), - - case maps:get(leader, QQInfo2) of - N1 -> ok; - N2 -> ok; - Other0 -> ct:fail({?LINE, Other0}) - end, - case maps:get(leader, SQInfo2) of - N1 -> ok; - N2 -> ok; - Other1 -> ct:fail({?LINE, Other1}) - end, - - %% Replicas should include both online and offline replicas. - {ok, QQReplicas2} = maps:find(replicas, QQInfo2), - ?assertEqual(Nodes, lists:usort(QQReplicas2)), - {ok, SQReplicas2} = maps:find(replicas, SQInfo2), - ?assertEqual(Nodes, lists:usort(SQReplicas2)), + eventually( + ?_assert( + begin + {ok, QQInfo2} = rabbitmq_amqp_client:get_queue(LinkPair2, QQName), + {ok, SQInfo2} = rabbitmq_amqp_client:get_queue(LinkPair2, SQName), + + {ok, QQReplicas2} = maps:find(replicas, QQInfo2), + {ok, SQReplicas2} = maps:find(replicas, SQInfo2), + QQReplicas = lists:usort(QQReplicas2), + SQReplicas = lists:usort(SQReplicas2), + QQLeader = maps:get(leader, QQInfo2), + SQLeader = maps:get(leader, SQInfo2), + ct:pal("quorum queue replicas: ~p~n" + "quorum queue leader: ~s~n" + "stream replicas: ~p~n" + "stream leader: ~s", + [QQReplicas, QQLeader, SQReplicas, SQLeader]), + %% Replicas should always include both online and offline replicas. + QQReplicas =:= Nodes andalso + SQReplicas =:= Nodes andalso + (QQLeader =:= N1 orelse QQLeader =:= N2) andalso + (SQLeader =:= N1 orelse SQLeader =:= N2) + end + ), 1000, 5), ok = rabbit_ct_broker_helpers:start_node(Config, 0), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair2, CQName), From 0c905f9b17d3bcd1388d943163f02ec8a42e5776 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 24 Oct 2024 12:34:03 +0200 Subject: [PATCH 2/7] Validate setting permissions works in order to troubleshoot the flake described in https://github.com/rabbitmq/rabbitmq-server/issues/12413#issuecomment-2419293869 ``` Node: rabbit_shard2@localhost Case: amqp_system_SUITE:access_failure Reason: {error,{{badmatch,{error,134, "Unhandled exception. System.Exception: expected exception not received\n at Program.Test.accessFailure(String uri) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 477\n at Program.main(String[] argv) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 509\n"}}, [{amqp_system_SUITE,run_dotnet_test,2, [{file,"amqp_system_SUITE.erl"}, {line,257}]}, ``` --- deps/rabbit/test/amqp_system_SUITE.erl | 36 ++++++++++--------- .../fsharp-tests/Program.fs | 18 +++++----- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/deps/rabbit/test/amqp_system_SUITE.erl b/deps/rabbit/test/amqp_system_SUITE.erl index d739c7b3fc96..37f9b3ac102d 100644 --- a/deps/rabbit/test/amqp_system_SUITE.erl +++ b/deps/rabbit/test/amqp_system_SUITE.erl @@ -219,28 +219,32 @@ auth_failure(Config) -> access_failure(Config) -> User = atom_to_binary(?FUNCTION_NAME), - rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), - rabbit_ct_broker_helpers:set_permissions(Config, User, <<"/">>, - <<".*">>, %% configure - <<"^banana.*">>, %% write - <<"^banana.*">> %% read - ), - run(Config, [ {dotnet, "access_failure"} ]). + ok = rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), + ok = rabbit_ct_broker_helpers:set_permissions(Config, User, <<"/">>, + <<".*">>, %% configure + <<"^banana.*">>, %% write + <<"^banana.*">> %% read + ), + run(Config, [ {dotnet, "access_failure"} ]), + ok = rabbit_ct_broker_helpers:delete_user(Config, User). + access_failure_not_allowed(Config) -> User = atom_to_binary(?FUNCTION_NAME), - rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), - run(Config, [ {dotnet, "access_failure_not_allowed"} ]). + ok = rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), + run(Config, [ {dotnet, "access_failure_not_allowed"} ]), + ok = rabbit_ct_broker_helpers:delete_user(Config, User). access_failure_send(Config) -> User = atom_to_binary(?FUNCTION_NAME), - rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), - rabbit_ct_broker_helpers:set_permissions(Config, User, <<"/">>, - <<".*">>, %% configure - <<"^banana.*">>, %% write - <<"^banana.*">> %% read - ), - run(Config, [ {dotnet, "access_failure_send"} ]). + ok = rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), + ok = rabbit_ct_broker_helpers:set_permissions(Config, User, <<"/">>, + <<".*">>, %% configure + <<"^banana.*">>, %% write + <<"^banana.*">> %% read + ), + run(Config, [ {dotnet, "access_failure_send"} ]), + ok = rabbit_ct_broker_helpers:delete_user(Config, User). run(Config, Flavors) -> ClientLibrary = ?config(amqp_client_library, Config), diff --git a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs index 5a1a0aaa5392..453406b84253 100755 --- a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs @@ -413,7 +413,7 @@ module Test = let invalidRoutes uri = - for dest, cond in + for addr, cond in ["/exchanges/missing", "amqp:not-found" "/fruit/orange", "amqp:invalid-field"] do use ac = connectAnon uri @@ -428,11 +428,11 @@ module Test = let attached = new OnAttached (fun _ _ -> trySet mre) let sender = new SenderLink(ac.Session, "test-sender", - Target(Address = dest), attached); + Target(Address = addr), attached); mre.WaitOne() |> ignore try - let receiver = ReceiverLink(ac.Session, "test-receiver", dest) + let receiver = ReceiverLink(ac.Session, "test-receiver", addr) receiver.Close() with | :? Amqp.AmqpException as ae -> @@ -454,11 +454,11 @@ module Test = let u = Uri uri let uri = sprintf "amqp://access_failure:boo@%s:%i" u.Host u.Port use ac = connect uri - let dest = "/queues/test" + let target = "/queues/test" ac.Session.add_Closed ( new ClosedCallback (fun _ err -> printfn "session err %A" err.Condition )) - let sender = new SenderLink(ac.Session, "test-sender", dest) + let sender = new SenderLink(ac.Session, "test-sender", target) sender.Send(new Message "hi", TimeSpan.FromSeconds 15.) failwith "expected exception not received" with @@ -471,8 +471,8 @@ module Test = let u = Uri uri let uri = sprintf "amqp://access_failure:boo@%s:%i" u.Host u.Port use ac = connect uri - let dest = "/queues/test" - let receiver = ReceiverLink(ac.Session, "test-receiver", dest) + let src = "/queues/test" + let receiver = ReceiverLink(ac.Session, "test-receiver", src) receiver.Close() failwith "expected exception not received" with @@ -485,8 +485,8 @@ module Test = let u = Uri uri let uri = sprintf "amqp://access_failure_not_allowed:boo@%s:%i" u.Host u.Port use ac = connect uri - let dest = "/queues/test" - let receiver = ReceiverLink(ac.Session, "test-receiver", dest) + let src = "/queues/test" + let receiver = ReceiverLink(ac.Session, "test-receiver", src) receiver.Close() failwith "expected exception not received" with From 2c0cdee7d23afe2b2e95b4a4c805b6662008cf99 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 24 Oct 2024 13:03:05 +0200 Subject: [PATCH 3/7] Support x-cc message annotation (#12559) Support x-cc message annotation Support an `x-cc` message annotation in AMQP 1.0 similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. The value of the `x-cc` message annotation must by a list of strings. A message annotation is used since application properties allow only simple types. --- deps/rabbit/BUILD.bazel | 6 + deps/rabbit/app.bzl | 2 +- deps/rabbit/src/mc.erl | 30 ++- deps/rabbit/src/mc_amqp.erl | 47 +--- deps/rabbit/src/mc_amqpl.erl | 19 +- deps/rabbit/src/mc_compat.erl | 4 + deps/rabbit/src/mc_util.erl | 2 +- deps/rabbit/src/rabbit_amqp_session.erl | 70 +++-- deps/rabbit/src/rabbit_stream_queue.erl | 37 ++- deps/rabbit/test/amqp_address_SUITE.erl | 5 +- deps/rabbit/test/amqp_client_SUITE.erl | 276 +++++++++++++++++++- deps/rabbit/test/dead_lettering_SUITE.erl | 18 +- deps/rabbit/test/mc_unit_SUITE.erl | 74 +++++- deps/rabbit/test/topic_permission_SUITE.erl | 88 +++++++ deps/rabbitmq_mqtt/src/mc_mqtt.erl | 6 + deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl | 5 + release-notes/4.1.0.md | 6 + 17 files changed, 593 insertions(+), 102 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 8ce54e6f584b..76be5953a6c3 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -862,6 +862,12 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "topic_permission_SUITE", size = "medium", + additional_beam = [ + ":test_amqp_utils_beam", + ], + runtime_deps = [ + "//deps/rabbitmq_amqp_client:erlang_app", + ], ) rabbitmq_integration_suite( diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index dca277a2ab00..9d6f7fab563f 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/topic_permission_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app"], + deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"], ) erlang_bytecode( name = "transactions_SUITE_beam_files", diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 3352f26185de..b3c51dca3976 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -26,6 +26,7 @@ priority/1, set_ttl/2, x_header/2, + x_headers/1, routing_headers/2, exchange/1, routing_keys/1, @@ -88,6 +89,7 @@ {timestamp, non_neg_integer()} | {list, [tagged_value()]} | {map, [{tagged_value(), tagged_value()}]} | + {array, atom(), [tagged_value()]} | null | undefined. @@ -104,11 +106,16 @@ {MetadataSize :: non_neg_integer(), PayloadSize :: non_neg_integer()}. -%% retrieve and x- header from the protocol data +%% retrieve an x- header from the protocol data %% the return value should be tagged with an AMQP 1.0 type -callback x_header(binary(), proto_state()) -> tagged_value(). +%% retrieve x- headers from the protocol data +%% the return values should be tagged with an AMQP 1.0 type +-callback x_headers(proto_state()) -> + #{binary() => tagged_value()}. + %% retrieve a property field from the protocol data %% e.g. message_id, correlation_id -callback property(atom(), proto_state()) -> @@ -148,7 +155,7 @@ init(Proto, Data, Anns) -> -spec init(protocol(), term(), annotations(), environment()) -> state(). init(Proto, Data, Anns0, Env) -> {ProtoData, ProtoAnns} = Proto:init(Data), - Anns1 = case map_size(Env) == 0 of + Anns1 = case map_size(Env) =:= 0 of true -> Anns0; false -> Anns0#{env => Env} end, @@ -214,6 +221,25 @@ x_header(Key, #?MODULE{protocol = Proto, x_header(Key, BasicMsg) -> mc_compat:x_header(Key, BasicMsg). +-spec x_headers(state()) -> + #{binary() => tagged_value()}. +x_headers(#?MODULE{protocol = Proto, + annotations = Anns, + data = Data}) -> + %% x-headers may be have been added to the annotations map. + New = maps:filtermap( + fun(Key, Val) -> + case mc_util:is_x_header(Key) of + true -> + {true, mc_util:infer_type(Val)}; + false -> + false + end + end, Anns), + maps:merge(Proto:x_headers(Data), New); +x_headers(BasicMsg) -> + mc_compat:x_headers(BasicMsg). + -spec routing_headers(state(), [x_headers | complex_types]) -> #{binary() => property_value()}. routing_headers(#?MODULE{protocol = Proto, diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index ed6c4b4145d6..06a923763da9 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -8,6 +8,7 @@ init/1, size/1, x_header/2, + x_headers/1, property/2, routing_headers/2, convert_to/3, @@ -125,6 +126,9 @@ size(#v1{message_annotations = MA, x_header(Key, Msg) -> message_annotation(Key, Msg, undefined). +x_headers(Msg) -> + #{K => V || {{_T, K}, V} <- message_annotations(Msg)}. + property(_Prop, #msg_body_encoded{properties = undefined}) -> undefined; property(Prop, #msg_body_encoded{properties = Props}) -> @@ -618,41 +622,16 @@ encode_deaths(Deaths) -> {map, Map} end, Deaths). -essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> +essential_properties(Msg) -> Durable = get_property(durable, Msg), Priority = get_property(priority, Msg), Timestamp = get_property(timestamp, Msg), Ttl = get_property(ttl, Msg), - Anns0 = #{?ANN_DURABLE => Durable}, - Anns = maps_put_truthy( - ?ANN_PRIORITY, Priority, - maps_put_truthy( - ?ANN_TIMESTAMP, Timestamp, - maps_put_truthy( - ttl, Ttl, - Anns0))), - case MA of - [] -> - Anns; - _ -> - lists:foldl( - fun ({{symbol, <<"x-routing-key">>}, - {utf8, Key}}, Acc) -> - maps:update_with(?ANN_ROUTING_KEYS, - fun(L) -> [Key | L] end, - [Key], - Acc); - ({{symbol, <<"x-cc">>}, - {list, CCs0}}, Acc) -> - CCs = [CC || {_T, CC} <- CCs0], - maps:update_with(?ANN_ROUTING_KEYS, - fun(L) -> L ++ CCs end, - CCs, - Acc); - ({{symbol, <<"x-exchange">>}, - {utf8, Exchange}}, Acc) -> - Acc#{?ANN_EXCHANGE => Exchange}; - (_, Acc) -> - Acc - end, Anns, MA) - end. + Anns = #{?ANN_DURABLE => Durable}, + maps_put_truthy( + ?ANN_PRIORITY, Priority, + maps_put_truthy( + ?ANN_TIMESTAMP, Timestamp, + maps_put_truthy( + ttl, Ttl, + Anns))). diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 723e60cd3f79..936a1b130d89 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -11,6 +11,7 @@ init/1, size/1, x_header/2, + x_headers/1, routing_headers/2, convert_to/3, convert_from/3, @@ -273,6 +274,23 @@ x_header(Key, #content{properties = none} = Content0) -> Content = rabbit_binary_parser:ensure_content_decoded(Content0), x_header(Key, Content). +x_headers(#content{properties = #'P_basic'{headers = undefined}}) -> + #{}; +x_headers(#content{properties = #'P_basic'{headers = Headers}}) -> + L = lists:filtermap( + fun({Name, Type, Val}) -> + case mc_util:is_x_header(Name) of + true -> + {true, {Name, from_091(Type, Val)}}; + false -> + false + end + end, Headers), + maps:from_list(L); +x_headers(#content{properties = none} = Content0) -> + Content = rabbit_binary_parser:ensure_content_decoded(Content0), + x_headers(Content). + property(Prop, Content) -> mc_util:infer_type(mc_compat:get_property(Prop, Content)). @@ -707,7 +725,6 @@ supported_header_value_type(table) -> supported_header_value_type(_) -> true. - amqp10_map_get(_K, []) -> undefined; amqp10_map_get(K, Tuples) -> diff --git a/deps/rabbit/src/mc_compat.erl b/deps/rabbit/src/mc_compat.erl index 056905239d96..5fce91b202a4 100644 --- a/deps/rabbit/src/mc_compat.erl +++ b/deps/rabbit/src/mc_compat.erl @@ -20,6 +20,7 @@ priority/1, set_ttl/2, x_header/2, + x_headers/1, routing_headers/2, %%% convert_to/2, @@ -138,6 +139,9 @@ set_ttl(Value, #basic_message{content = Content0} = Msg) -> x_header(Key,#basic_message{content = Content}) -> mc_amqpl:x_header(Key, Content). +x_headers(#basic_message{content = Content}) -> + mc_amqpl:x_headers(Content). + routing_headers(#basic_message{content = Content}, Opts) -> mc_amqpl:routing_headers(Content, Opts). diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 1f20d15699db..9ec7928de9b7 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -61,7 +61,7 @@ utf8_string_is_ascii(UTF8String) -> amqp_map_get(Key, {map, List}, Default) -> amqp_map_get(Key, List, Default); amqp_map_get(Key, List, Default) when is_list(List) -> - case lists:search(fun ({{_, K}, _}) -> K == Key end, List) of + case lists:search(fun ({{_, K}, _}) -> K =:= Key end, List) of {value, {_K, V}} -> V; false -> diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index c9d505647eb5..81e4d88d071d 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -154,6 +154,7 @@ %% The routing key is either defined in the ATTACH frame and static for %% the life time of the link or dynamically provided in each message's %% "to" field (address v2) or "subject" field (address v1). + %% (A publisher can set additional routing keys via the x-cc message annotation.) routing_key :: rabbit_types:routing_key() | to | subject, %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), @@ -2369,11 +2370,11 @@ incoming_link_transfer( Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of - {ok, X, RoutingKey, Mc1, PermCache} -> + {ok, X, RoutingKeys, Mc1, PermCache} -> Mc2 = rabbit_message_interceptor:intercept(Mc1), check_user_id(Mc2, User), - TopicPermCache = check_write_permitted_on_topic( - X, User, RoutingKey, TopicPermCache0), + TopicPermCache = check_write_permitted_on_topics( + X, User, RoutingKeys, TopicPermCache0), QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, @@ -2408,14 +2409,14 @@ incoming_link_transfer( "delivery_tag=~p, delivery_id=~p, reason=~p", [DeliveryTag, DeliveryId, Reason]) end; - {error, #'v1_0.error'{} = Err} -> + {error, {anonymous_terminus, false}, #'v1_0.error'{} = Err} -> Disposition = case Settled of true -> []; false -> [released(DeliveryId)] end, Detach = [detach(HandleInt, Link0, Err)], {error, Disposition ++ Detach}; - {error, anonymous_terminus, #'v1_0.error'{} = Err} -> + {error, {anonymous_terminus, true}, #'v1_0.error'{} = Err} -> %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors case Settled of true -> @@ -2440,13 +2441,13 @@ incoming_link_transfer( end. lookup_target(#exchange{} = X, LinkRKey, Mc, _, _, PermCache) -> - lookup_routing_key(X, LinkRKey, Mc, PermCache); + lookup_routing_key(X, LinkRKey, Mc, false, PermCache); lookup_target(#resource{} = XName, LinkRKey, Mc, _, _, PermCache) -> case rabbit_exchange:lookup(XName) of {ok, X} -> - lookup_routing_key(X, LinkRKey, Mc, PermCache); + lookup_routing_key(X, LinkRKey, Mc, false, PermCache); {error, not_found} -> - {error, error_not_found(XName)} + {error, {anonymous_terminus, false}, error_not_found(XName)} end; lookup_target(to, to, Mc, Vhost, User, PermCache0) -> case mc:property(to, Mc) of @@ -2458,25 +2459,26 @@ lookup_target(to, to, Mc, Vhost, User, PermCache0) -> case rabbit_exchange:lookup(XName) of {ok, X} -> check_internal_exchange(X), - lookup_routing_key(X, RKey, Mc, PermCache); + lookup_routing_key(X, RKey, Mc, true, PermCache); {error, not_found} -> - {error, anonymous_terminus, error_not_found(XName)} + {error, {anonymous_terminus, true}, error_not_found(XName)} end; {error, bad_address} -> - {error, anonymous_terminus, + {error, {anonymous_terminus, true}, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"bad 'to' address string: ", String/binary>>}}} end; undefined -> - {error, anonymous_terminus, + {error, {anonymous_terminus, true}, #'v1_0.error'{ condition = ?V_1_0_AMQP_ERROR_PRECONDITION_FAILED, description = {utf8, <<"anonymous terminus requires 'to' address to be set">>}}} end. lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, - RKey0, Mc0, PermCache) -> + RKey0, Mc0, AnonTerm, PermCache) -> + Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0), RKey = case RKey0 of subject -> case mc:property(subject, Mc0) of @@ -2488,9 +2490,31 @@ lookup_routing_key(X = #exchange{name = #resource{name = XNameBin}}, _ when is_binary(RKey0) -> RKey0 end, - Mc1 = mc:set_annotation(?ANN_EXCHANGE, XNameBin, Mc0), - Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc1), - {ok, X, RKey, Mc, PermCache}. + case mc:x_header(<<"x-cc">>, Mc0) of + undefined -> + RKeys = [RKey], + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), + {ok, X, RKeys, Mc, PermCache}; + {list, CCs0} = L -> + try lists:map(fun({utf8, CC}) -> CC end, CCs0) of + CCs -> + RKeys = [RKey | CCs], + Mc = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), + {ok, X, RKeys, Mc, PermCache} + catch error:function_clause -> + {error, {anonymous_terminus, AnonTerm}, bad_x_cc(L)} + end; + BadValue -> + {error, {anonymous_terminus, AnonTerm}, bad_x_cc(BadValue)} + end. + +bad_x_cc(Value) -> + Desc = unicode:characters_to_binary( + lists:flatten( + io_lib:format( + "bad value for 'x-cc' message-annotation: ~tp", [Value]))), + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, Desc}}. process_routing_confirm([], _SenderSettles = true, _, U) -> rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1), @@ -3445,14 +3469,20 @@ check_resource_access(Resource, Perm, User, Cache) -> end end. --spec check_write_permitted_on_topic( +-spec check_write_permitted_on_topics( rabbit_types:exchange(), rabbit_types:user(), - rabbit_types:routing_key(), + [rabbit_types:routing_key(),...], topic_permission_cache()) -> topic_permission_cache(). -check_write_permitted_on_topic(Resource, User, RoutingKey, TopicPermCache) -> - check_topic_authorisation(Resource, User, RoutingKey, write, TopicPermCache). +check_write_permitted_on_topics(#exchange{type = topic} = Resource, + User, RoutingKeys, TopicPermCache) -> + lists:foldl( + fun(RoutingKey, Cache) -> + check_topic_authorisation(Resource, User, RoutingKey, write, Cache) + end, TopicPermCache, RoutingKeys); +check_write_permitted_on_topics(_, _, _, TopicPermCache) -> + TopicPermCache. -spec check_read_permitted_on_topic( rabbit_types:exchange(), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a011dc09a650..111b7d8b7df0 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -1272,17 +1272,36 @@ parse_uncompressed_subbatch( entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) -> Mc0 = mc:init(mc_amqp, Entry, #{}), - %% If exchange or routing_keys annotation isn't present the entry most likely came + %% If exchange or routing keys annotation isn't present the entry most likely came %% from the rabbitmq-stream plugin so we'll choose defaults that simulate use %% of the direct exchange. - Mc1 = case mc:exchange(Mc0) of - undefined -> mc:set_annotation(?ANN_EXCHANGE, <<>>, Mc0); - _ -> Mc0 - end, - Mc2 = case mc:routing_keys(Mc1) of - [] -> mc:set_annotation(?ANN_ROUTING_KEYS, [QName], Mc1); - _ -> Mc1 - end, + XHeaders = mc:x_headers(Mc0), + Exchange = case XHeaders of + #{<<"x-exchange">> := {utf8, X}} -> + X; + _ -> + <<>> + end, + RKeys0 = case XHeaders of + #{<<"x-cc">> := {list, CCs}} -> + [CC || {utf8, CC} <- CCs]; + _ -> + [] + end, + RKeys1 = case XHeaders of + #{<<"x-routing-key">> := {utf8, RK}} -> + [RK | RKeys0]; + _ -> + RKeys0 + end, + RKeys = case RKeys1 of + [] -> + [QName]; + _ -> + RKeys1 + end, + Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0), + Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1), Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2), case rabbit_amqp_filtex:filter(Filter, Mc) of true -> diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index f5a0f74b8932..607aa11473aa 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -304,10 +304,9 @@ target_per_message_exchange_routing_key(Config) -> Tag1 = Body1 = <<1>>, Tag2 = Body2 = <<2>>, - %% Although mc_amqp:essential_properties/1 parses these annotations, they should be ignored. + %% Although mc_amqp:essential_properties/1 parses the x-exchange annotation, it should be ignored. Msg1 = amqp10_msg:set_message_annotations( - #{<<"x-exchange">> => <<"ignored">>, - <<"x-routing-key">> => <<"ignored">>}, + #{<<"x-exchange">> => <<"ignored">>}, amqp10_msg:set_properties(#{to => To1}, amqp10_msg:new(Tag1, Body1))), Msg2 = amqp10_msg:set_properties(#{to => To2}, amqp10_msg:new(Tag2, Body2)), ok = amqp10_client:send_msg(Sender, Msg1), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index f192a0c309f8..91fa3abdc687 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -116,7 +116,8 @@ groups() -> available_messages_quorum_queue, available_messages_stream, incoming_message_interceptors, - trace, + trace_classic_queue, + trace_stream, user_id, message_ttl, plugin, @@ -156,7 +157,12 @@ groups() -> tcp_back_pressure_rabbitmq_internal_flow_quorum_queue, session_max_per_connection, link_max_per_session, - reserved_annotation + reserved_annotation, + x_cc_annotation_exchange, + x_cc_annotation_exchange_routing_key_empty, + x_cc_annotation_queue, + x_cc_annotation_null, + bad_x_cc_annotation_exchange ]}, {cluster_size_3, [shuffle], @@ -4393,16 +4399,26 @@ incoming_message_interceptors(Config) -> ok = amqp10_client:close_connection(Connection), true = rpc(Config, persistent_term, erase, [Key]). -trace(Config) -> +trace_classic_queue(Config) -> + trace(atom_to_binary(?FUNCTION_NAME), <<"classic">>, Config). + +trace_stream(Config) -> + trace(atom_to_binary(?FUNCTION_NAME), <<"stream">>, Config). + +trace(Q, QType, Config) -> Node = atom_to_binary(get_node_config(Config, 0, nodename)), TraceQ = <<"my trace queue">>, - Q = <<"my queue">>, Qs = [Q, TraceQ], RoutingKey = <<"my routing key">>, Payload = <<"my payload">>, CorrelationId = <<"my correlation 👀"/utf8>>, Ch = rabbit_ct_client_helpers:open_channel(Config), - [#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q0}) || Q0 <- Qs], + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = Q, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, QType}]}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = TraceQ}), #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, exchange = <<"amq.rabbitmq.trace">>, @@ -4420,16 +4436,21 @@ trace(Config) -> {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_on"]), {ok, SessionReceiver} = amqp10_client:begin_session_sync(Connection), + {ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver, + <<"test-receiver">>, + rabbitmq_amqp_address:queue(Q)), + receive {amqp10_event, {link, Receiver, attached}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, {ok, Sender} = amqp10_client:attach_sender_link( SessionSender, <<"test-sender">>, rabbitmq_amqp_address:exchange(<<"amq.direct">>, RoutingKey)), ok = wait_for_credit(Sender), - {ok, Receiver} = amqp10_client:attach_receiver_link(SessionReceiver, - <<"test-receiver">>, - rabbitmq_amqp_address:queue(Q)), Msg0 = amqp10_msg:new(<<"tag 1">>, Payload, true), - Msg = amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0), + Msg = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"my CC key">>}]}}, + amqp10_msg:set_properties(#{correlation_id => CorrelationId}, Msg0)), ok = amqp10_client:send_msg(Sender, Msg), {ok, _} = amqp10_client:get_msg(Receiver), @@ -4439,7 +4460,7 @@ trace(Config) -> payload = Payload}} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}), ?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>, - <<"routing_keys">> := [RoutingKey], + <<"routing_keys">> := [RoutingKey, <<"my CC key">>], <<"connection">> := <<"127.0.0.1:", _/binary>>, <<"node">> := Node, <<"vhost">> := <<"/">>, @@ -4454,7 +4475,7 @@ trace(Config) -> payload = Payload}} = amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}), ?assertMatch(#{<<"exchange_name">> := <<"amq.direct">>, - <<"routing_keys">> := [RoutingKey], + <<"routing_keys">> := [RoutingKey, <<"my CC key">>], <<"connection">> := <<"127.0.0.1:", _/binary>>, <<"node">> := Node, <<"vhost">> := <<"/">>, @@ -5956,6 +5977,239 @@ reserved_annotation(Config) -> end, ok = close_connection_sync(Connection). +%% Test that x-cc routing keys work together with target address +%% /exchanges/:exchange/:routing-key +x_cc_annotation_exchange(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}), + Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key 1">>), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"key 2">>}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address +%% /exchanges/:exchange +x_cc_annotation_exchange_routing_key_empty(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key 1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"key 2">>, #{}), + AddressEmptyRoutingKey = rabbitmq_amqp_address:exchange(<<"amq.direct">>), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, AddressEmptyRoutingKey), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"key 1">>}, + {utf8, <<"key 2">>}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address +%% /queues/:queue +x_cc_annotation_queue(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + Address1 = rabbitmq_amqp_address:queue(QName1), + Address2 = rabbitmq_amqp_address:queue(QName2), + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address1), + ok = wait_for_credit(Sender), + + Payload = <<"my message">>, + ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, QName2}]}}, + amqp10_msg:new(<<"tag">>, Payload))), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:detach_link(Sender), + + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, Address1, settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, Address2, settled), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([Payload], amqp10_msg:body(Msg1)), + ?assertEqual([Payload], amqp10_msg:body(Msg2)), + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +%% Test that x-cc routing keys work together with target address 'null' +x_cc_annotation_null(Config) -> + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + QAddress1 = rabbitmq_amqp_address:queue(QName1), + QAddress2 = rabbitmq_amqp_address:queue(QName2), + {Connection, Session, LinkPair} = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.direct">>, <<"key-1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.direct">>, <<"🗝️-2"/utf8>>, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), + ok = wait_for_credit(Sender), + {ok, Receiver1} = amqp10_client:attach_receiver_link(Session, <<"receiver 1">>, QAddress1, settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link(Session, <<"receiver 2">>, QAddress2, settled), + + Msg1 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"key-1">>}, + {utf8, <<"key-3">>}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"🗝️-2"/utf8>>)}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = amqp10_client:send_msg(Sender, Msg1), + ok = wait_for_accepted(<<"t1">>), + {ok, R1M1} = amqp10_client:get_msg(Receiver1), + {ok, R2M1} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m1">>], amqp10_msg:body(R1M1)), + ?assertEqual([<<"m1">>], amqp10_msg:body(R2M1)), + + Msg2 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"🗝️-2"/utf8>>}, + {utf8, <<"key-1">>}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.direct">>)}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = amqp10_client:send_msg(Sender, Msg2), + ok = wait_for_accepted(<<"t2">>), + {ok, R1M2} = amqp10_client:get_msg(Receiver1), + {ok, R2M2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m2">>], amqp10_msg:body(R1M2)), + ?assertEqual([<<"m2">>], amqp10_msg:body(R2M2)), + + Msg3 = amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, QName1}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName2)}, + amqp10_msg:new(<<"t3">>, <<"m3">>))), + ok = amqp10_client:send_msg(Sender, Msg3), + ok = wait_for_accepted(<<"t3">>), + {ok, R1M3} = amqp10_client:get_msg(Receiver1), + {ok, R2M3} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m3">>], amqp10_msg:body(R1M3)), + ?assertEqual([<<"m3">>], amqp10_msg:body(R2M3)), + + Msg4 = amqp10_msg:set_message_annotations( + %% We send a symbol instead of utf8.. + #{<<"x-cc">> => {list, [{symbol, QName1}]}}, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName2)}, + amqp10_msg:new(<<"t4">>, <<"m4">>))), + ok = amqp10_client:send_msg(Sender, Msg4), + %% "If the source of the link supports the rejected outcome, and the message has not + %% already been settled by the sender, then the routing node MUST reject the message. + %% In this case the error field of rejected MUST contain the error which would have been communicated + %% in the detach which would have be sent if a link to the same address had been attempted." + %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-routingerrors + receive {amqp10_disposition, {{rejected, Error}, <<"t4">>}} -> + ?assertMatch( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation:", _/binary>>}}, + Error) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver1), + ok = amqp10_client:detach_link(Receiver2), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + +bad_x_cc_annotation_exchange(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session(Connection), + + Address = rabbitmq_amqp_address:exchange(<<"amq.direct">>, <<"key-1">>), + {ok, Sender1} = amqp10_client:attach_sender_link(Session, <<"sender 1">>, Address), + ok = wait_for_credit(Sender1), + ok = amqp10_client:send_msg( + Sender1, + amqp10_msg:set_message_annotations( + %% We send an array instead of a list. + #{<<"x-cc">> => {array, utf8, [{utf8, <<"🗝️-2"/utf8>>}]}}, + amqp10_msg:new(<<"t1">>, <<"m1">>))), + ok = wait_for_settlement(<<"t1">>, released), + receive {amqp10_event, {link, Sender1, {detached, Error1}}} -> + ?assertMatch( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation: " + "{array,utf8,[{utf8,<<\"🗝️-2"/utf8, _Rest/binary>>}}, + Error1) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, Address), + ok = wait_for_credit(Sender2), + ok = amqp10_client:send_msg( + Sender2, + amqp10_msg:set_message_annotations( + %% We include a non-utf8 type in the list. + #{<<"x-cc">> => {list, [{symbol, <<"key-3">>}]}}, + amqp10_msg:new(<<"t2">>, <<"m2">>))), + ok = wait_for_settlement(<<"t2">>, released), + receive {amqp10_event, {link, Sender2, {detached, Error2}}} -> + ?assertEqual( + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD, + description = {utf8, <<"bad value for 'x-cc' message-annotation: " + "{list,[{symbol,<<\"key-3\">>}]}">>}}, + Error2) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = end_session_sync(Session), + ok = amqp10_client:close_connection(Connection). + %% internal %% diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index 6d0ad63b13d8..b793cb3abebd 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -177,15 +177,11 @@ end_per_group(Group, Config) -> init_per_testcase(T, Config) when T =:= dead_letter_reject_expire_expire orelse T =:= stream -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of - ok -> - init_per_testcase0(T, Config); - {skip, _} = Skip -> - %% With feature flag message_containers_deaths_v2 disabled, test case: - %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 - %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 - Skip - end; + %% With feature flag message_containers_deaths_v2 disabled, test case: + %% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159 + %% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173 + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2), + init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -1860,6 +1856,10 @@ stream(Config) -> {timestamp, T2} = rabbit_misc:table_lookup(Death2, <<"time">>), ?assert(T1 < T2), + ?assertEqual({array, [{longstr, <<"cc 1">>}, + {longstr, <<"cc 2">>}]}, + rabbit_misc:table_lookup(Headers, <<"CC">>)), + ok = rabbit_ct_client_helpers:close_channel(Ch0), ok = rabbit_ct_client_helpers:close_channel(Ch1). diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index acc9ea69adfe..f8d10462e629 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -42,7 +42,9 @@ all_tests() -> amqp_amqpl_message_id_binary, amqp_amqpl_unsupported_values_not_converted, amqp_to_amqpl_data_body, - amqp_amqpl_amqp_bodies + amqp_amqpl_amqp_bodies, + amqp_x_headers, + amqpl_x_headers ]. %%%=================================================================== @@ -195,10 +197,7 @@ amqpl_table_x_header_array_of_tbls(_Config) -> [{{symbol, <<"type">>}, {utf8, <<"orange">>}}, {{symbol, <<"count">>}, {long, 45}}]} ]}, - mc:x_header(<<"x-fruit">>, Msg)), - - - ok. + mc:x_header(<<"x-fruit">>, Msg)). amqpl_death_v1_records(_Config) -> ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}). @@ -364,8 +363,9 @@ amqpl_amqp_bin_amqpl(_Config) -> Msg10Pre = mc:convert(mc_amqp, Msg), Payload = iolist_to_binary(mc:protocol_state(Msg10Pre)), Msg10 = mc:init(mc_amqp, Payload, #{}), - ?assertEqual(<<"exch">>, mc:exchange(Msg10)), - ?assertEqual([<<"apple">>], mc:routing_keys(Msg10)), + ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, + <<"x-routing-key">> := {utf8, <<"apple">>}}, + mc:x_headers(Msg10)), ?assertEqual(98, mc:priority(Msg10)), ?assertEqual(true, mc:is_persistent(Msg10)), ?assertEqual(99000, mc:timestamp(Msg10)), @@ -422,8 +422,6 @@ amqpl_amqp_bin_amqpl(_Config) -> MsgL2 = mc:convert(mc_amqpl, Msg10), - ?assertEqual(<<"exch">>, mc:exchange(MsgL2)), - ?assertEqual([<<"apple">>], mc:routing_keys(MsgL2)), ?assertEqual(98, mc:priority(MsgL2)), ?assertEqual(true, mc:is_persistent(MsgL2)), ?assertEqual(99000, mc:timestamp(MsgL2)), @@ -450,9 +448,17 @@ amqpl_cc_amqp_bin_amqpl(_Config) -> Msg10Pre = mc:convert(mc_amqp, Msg), Sections = iolist_to_binary(mc:protocol_state(Msg10Pre)), Msg10 = mc:init(mc_amqp, Sections, #{}), - ?assertEqual(RoutingKeys, mc:routing_keys(Msg10)), + ?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>}, + <<"x-routing-key">> := {utf8, <<"apple">>}, + <<"x-cc">> := {list, [{utf8, <<"q1">>}, + {utf8, <<"q2">>}]}}, + mc:x_headers(Msg10)), - MsgL2 = mc:convert(mc_amqpl, Msg10), + %% Here, we simulate what rabbit_stream_queue does: + Msg10b = mc:set_annotation(?ANN_EXCHANGE, <<"exch">>, Msg10), + Msg10c = mc:set_annotation(?ANN_ROUTING_KEYS, [<<"apple">>, <<"q1">>, <<"q2">>], Msg10b), + + MsgL2 = mc:convert(mc_amqpl, Msg10c), ?assertEqual(RoutingKeys, mc:routing_keys(MsgL2)), ?assertMatch(#content{properties = #'P_basic'{headers = Headers}}, mc:protocol_state(MsgL2)). @@ -751,6 +757,52 @@ amqp_amqpl_amqp_bodies(_Config) -> end || Body <- Bodies], ok. +amqp_x_headers(_Config) -> + MAC = [ + {{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}}, + thead2('x-list', list, [utf8(<<"l">>)]), + thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]) + ], + M = #'v1_0.message_annotations'{content = MAC}, + AC = [thead(long, 5)], + A = #'v1_0.application_properties'{content = AC}, + D = #'v1_0.data'{content = <<"data">>}, + + Payload = serialize_sections([M, A, D]), + Msg0 = mc:init(mc_amqp, Payload, annotations()), + Msg1 = mc:set_annotation(<<"x-1">>, {byte, -2}, Msg0), + ?assertEqual(#{<<"x-1">> => {byte, -2}, + <<"x-list">> => {list,[{utf8,<<"l">>}]}, + <<"x-map">> => {map,[{{utf8,<<"k">>},{utf8,<<"v">>}}]}, + <<"x-stream-filter">> => {utf8,<<"apple">>}}, + mc:x_headers(Msg1)). + +amqpl_x_headers(_Config) -> + Props = #'P_basic'{headers = [{<<"a-string">>, longstr, <<"a string">>}, + {<<"x-1">>, binary, <<"v1">>}, + {<<"x-stream-filter">>, longstr, <<"apple">>}]}, + Payload = [<<"data">>], + Content = #content{properties = Props, + payload_fragments_rev = Payload}, + + Msg0 = mc:init(mc_amqpl, Content, annotations()), + Msg1 = mc:set_annotation(delivery_count, 1, Msg0), + Msg = mc:set_annotation(<<"x-delivery-count">>, 2, Msg1), + ?assertEqual(#{<<"x-1">> => {binary, <<"v1">>}, + <<"x-stream-filter">> => {utf8,<<"apple">>}, + <<"x-delivery-count">> => {long, 2}}, + mc:x_headers(Msg)), + + XName = <<"exch">>, + RoutingKey = <<"apple">>, + {ok, BasicMsg0} = rabbit_basic:message_no_id(XName, RoutingKey, Content), + BasicMsg1 = mc:set_annotation(delivery_count, 1, BasicMsg0), + BasicMsg = mc:set_annotation(<<"x-delivery-count">>, 2, BasicMsg1), + ?assertEqual(#{<<"x-1">> => {binary, <<"v1">>}, + <<"x-stream-filter">> => {utf8,<<"apple">>}, + <<"x-delivery-count">> => {long, 2}}, + mc:x_headers(BasicMsg)). + %% Utility amqp10_encode_bin(L) when is_list(L) -> diff --git a/deps/rabbit/test/topic_permission_SUITE.erl b/deps/rabbit/test/topic_permission_SUITE.erl index 2849b76fd3b9..b7c2e10b2421 100644 --- a/deps/rabbit/test/topic_permission_SUITE.erl +++ b/deps/rabbit/test/topic_permission_SUITE.erl @@ -8,6 +8,7 @@ -module(topic_permission_SUITE). -include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -compile([export_all, nowarn_export_all]). @@ -21,6 +22,7 @@ groups() -> [ {sequential_tests, [], [ + amqp_x_cc_annotation, amqpl_cc_headers, amqpl_bcc_headers, topic_permission_database_access, @@ -29,6 +31,7 @@ groups() -> ]. init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(amqp10_client), rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config( Config, @@ -56,6 +59,91 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). +amqp_x_cc_annotation(Config) -> + ok = set_topic_permissions(Config, "^a", ".*"), + + QName1 = <<"queue 1">>, + QName2 = <<"queue 2">>, + {Connection, Session1, LinkPair} = amqp_utils:init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName1, #{}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName2, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName1, <<"amq.topic">>, <<"a.1">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName2, <<"amq.topic">>, <<"a.2">>, #{}), + + {ok, Sender1} = amqp10_client:attach_sender_link( + Session1, + <<"sender 1">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)), + ok = amqp_utils:wait_for_credit(Sender1), + {ok, Receiver1} = amqp10_client:attach_receiver_link( + Session1, <<"receiver 1">>, rabbitmq_amqp_address:queue(QName1), settled), + {ok, Receiver2} = amqp10_client:attach_receiver_link( + Session1, <<"receiver 2">>, rabbitmq_amqp_address:queue(QName2), settled), + %% We have permissions to send to both topics. + %% Therefore, m1 should be sent to both queues. + ok = amqp10_client:send_msg(Sender1, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}}, + amqp10_msg:new(<<"t1">>, <<"m1">>, true))), + {ok, Msg1} = amqp10_client:get_msg(Receiver1), + {ok, Msg2} = amqp10_client:get_msg(Receiver2), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg2)), + ok = amqp_utils:detach_link_sync(Sender1), + ok = amqp_utils:detach_link_sync(Receiver1), + ok = amqp_utils:detach_link_sync(Receiver2), + + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Sender2} = amqp10_client:attach_sender_link( + Session2, + <<"sender 2">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"x.1">>)), + ok = amqp_utils:wait_for_credit(Sender2), + ok = amqp10_client:send_msg(Sender2, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"a.2">>}]}}, + amqp10_msg:new(<<"t2">>, <<"m2">>, true))), + receive + {amqp10_event, + {session, Session2, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description1}}}}} -> + ?assertEqual( + <<"write access to topic 'x.1' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, + Description1) + after 5000 -> amqp_utils:flush(missing_ended), + ct:fail({missing_event, ?LINE}) + end, + + {ok, Session3} = amqp10_client:begin_session_sync(Connection), + {ok, Sender3} = amqp10_client:attach_sender_link( + Session3, + <<"sender 3">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"a.1">>)), + ok = amqp_utils:wait_for_credit(Sender3), + ok = amqp10_client:send_msg(Sender3, amqp10_msg:set_message_annotations( + #{<<"x-cc">> => {list, [{utf8, <<"x.2">>}]}}, + amqp10_msg:new(<<"t3">>, <<"m3">>, true))), + receive + {amqp10_event, + {session, Session3, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description2}}}}} -> + ?assertEqual( + <<"write access to topic 'x.2' in exchange 'amq.topic' in vhost '/' refused for user 'guest'">>, + Description2) + after 5000 -> amqp_utils:flush(missing_ended), + ct:fail({missing_event, ?LINE}) + end, + + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName1), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName2), + ok = amqp_utils:end_session_sync(Session1), + ok = amqp10_client:close_connection(Connection), + ok = clear_topic_permissions(Config). + amqpl_cc_headers(Config) -> amqpl_headers(<<"CC">>, Config). diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index b6cae214c8c3..656b44dd8b7b 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -14,6 +14,7 @@ init/1, size/1, x_header/2, + x_headers/1, property/2, routing_headers/2, convert_to/3, @@ -390,6 +391,11 @@ x_header(Key, #mqtt_msg{props = #{'User-Property' := UserProp}}) -> x_header(_Key, #mqtt_msg{}) -> undefined. +x_headers(#mqtt_msg{props = #{'User-Property' := UserProp}}) -> + #{Key => {utf8, Val} || {<<"x-", _/binary>> = Key, Val} <- UserProp}; +x_headers(#mqtt_msg{}) -> + #{}. + property(correlation_id, #mqtt_msg{props = #{'Correlation-Data' := Corr}}) -> case mc_util:urn_string_to_uuid(Corr) of {ok, UUId} -> diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index 14d88f357602..c6d1308e9ad2 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -61,6 +61,10 @@ roundtrip_amqp(_Config) -> PayloadSize = 10, ExpectedSize = {MetaDataSize, PayloadSize}, ?assertEqual(ExpectedSize, mc:size(Mc0)), + ?assertEqual(#{<<"x-key-1">> => {utf8, <<"val-1">>}, + <<"x-key-2">> => {utf8, <<"val-2">>}, + <<"x-key-3">> => {utf8, <<"val-3">>}}, + mc:x_headers(Mc0)), Env = #{}, ?assertEqual(Msg, mc_mqtt:convert_to(mc_mqtt, Msg, Env)), @@ -310,6 +314,7 @@ mqtt_amqpl_alt(_Config) -> }, Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]}, Mc = mc:init(mc_mqtt, Msg, Anns), + ?assertEqual(#{}, mc:x_headers(Mc)), MsgL = mc:convert(mc_amqpl, Mc), #content{properties = #'P_basic'{headers = HL} = Props} = diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index ca80cfa59630..b4fe0f8b56cc 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -18,6 +18,12 @@ This feature: This feature allows operators to gain insights into the message sizes being published to RabbitMQ, such as average message size, number of messages per pre-defined bucket (which can both be computed accurately), and percentiles (which will be approximated). Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1, and MQTT 3.1). +## New Features + +### Support for Multiple Routing Keys in AMQP 1.0 via `x-cc` Message Annotation +[PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. +This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. + ## Potential incompatibilities * The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB). From c476540bbcb0ea171fdf780c32559360cb091827 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 24 Oct 2024 16:00:00 +0200 Subject: [PATCH 4/7] Remove test flake Prior to this commit tests * leader_transfer_quorum_queue_credit_single * leader_transfer_quorum_queue_credit_batches flaked in CI during 4.1 (main) and 4.0 mixed version testing. The follwing error occurred on node 0: ``` [error] <0.1950.0> Timed out waiting for credit reply from quorum queue 'leader_transfer_quorum_queue_credit_batches' in vhost '/'. Hint: Enable feature flag rabbitmq_4.0.0 [warning] <0.1950.0> Closing session for connection <0.1945.0>: {'v1_0.error', [warning] <0.1950.0> {symbol,<<"amqp:internal-error">>}, [warning] <0.1950.0> {utf8, [warning] <0.1950.0> <<"Timed out waiting for credit reply from quorum queue 'leader_transfer_quorum_queue_credit_batches' in vhost '/'. Hint: Enable feature flag rabbitmq_4.0.0">>}, [warning] <0.1950.0> undefined} ``` Therefore we enable this feature flag for both tests. This commit also simplifies some test setups that were necessary for 4.0/3.13 mixed version testing, but isn't necessary anymore for 4.1/4.0 mixed version testing. --- deps/rabbit/test/amqp_client_SUITE.erl | 50 ++++++++------------------ 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 91fa3abdc687..8d023b7cb2f5 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -303,12 +303,15 @@ init_per_testcase(T, Config) when T =:= detach_requeues_one_session_quorum_queue orelse T =:= single_active_consumer_quorum_queue orelse T =:= detach_requeues_two_connections_quorum_queue -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of - ok -> - rabbit_ct_helpers:testcase_started(Config, T); - {skip, _} -> - {skip, "Feature flag rabbitmq_4.0.0 enables the consumer removal API"} - end; + %% Feature flag rabbitmq_4.0.0 enables the consumer removal API. + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0'), + rabbit_ct_helpers:testcase_started(Config, T); +init_per_testcase(T, Config) + when T =:= leader_transfer_quorum_queue_credit_single orelse + T =:= leader_transfer_quorum_queue_credit_batches -> + %% These test cases flake with feature flag 'rabbitmq_4.0.0' disabled. + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0'), + rabbit_ct_helpers:testcase_started(Config, T); init_per_testcase(T = immutable_bare_message, Config) -> case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of true -> @@ -333,26 +336,6 @@ init_per_testcase(T = dead_letter_reject, Config) -> {skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled " "due bug https://github.com/rabbitmq/rabbitmq-server/issues/11159"} end; -init_per_testcase(T, Config) - when T =:= leader_transfer_quorum_queue_credit_single orelse - T =:= leader_transfer_quorum_queue_credit_batches orelse - T =:= leader_transfer_stream_credit_single orelse - T =:= leader_transfer_stream_credit_batches orelse - T =:= leader_transfer_quorum_queue_send orelse - T =:= leader_transfer_stream_send -> - case rpc(Config, rabbit_feature_flags, is_supported, ['rabbitmq_4.0.0']) of - true -> - rabbit_ct_helpers:testcase_started(Config, T); - false -> - {skip, "This test requires the AMQP management extension of RabbitMQ 4.0"} - end; -init_per_testcase(T, Config) - when T =:= classic_queue_on_new_node orelse - T =:= quorum_queue_on_new_node -> - %% If node 1 runs 4.x, this is the new no-op plugin. - %% If node 1 runs 3.x, this is the old real plugin. - ok = rabbit_ct_broker_helpers:enable_plugin(Config, 1, rabbitmq_amqp1_0), - rabbit_ct_helpers:testcase_started(Config, T); init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -3543,14 +3526,11 @@ async_notify_settled_stream(Config) -> async_notify(settled, <<"stream">>, Config). async_notify_unsettled_classic_queue(Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0') of - ok -> - async_notify(unsettled, <<"classic">>, Config); - {skip, _} -> - {skip, "Skipping as this test will flake. Link flow control in classic " - "queues with credit API v1 is known to be broken: " - "https://github.com/rabbitmq/rabbitmq-server/issues/2597"} - end. + %% This test flakes with feature flag 'rabbitmq_4.0.0' disabled. + %% Link flow control in classic queues with credit API v1 is known to be broken: + %% https://github.com/rabbitmq/rabbitmq-server/issues/2597 + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.0.0'), + async_notify(unsettled, <<"classic">>, Config). async_notify_unsettled_quorum_queue(Config) -> async_notify(unsettled, <<"quorum">>, Config). @@ -3852,7 +3832,6 @@ leader_transfer_credit(QName, QType, Credit, Config) -> ok = end_session_sync(Session1), ok = close_connection_sync(Connection1), - %% Consume from a follower. OpnConf = connection_config(0, Config), {ok, Connection0} = amqp10_client:open_connection(OpnConf), {ok, Session0} = amqp10_client:begin_session_sync(Connection0), @@ -3866,6 +3845,7 @@ leader_transfer_credit(QName, QType, Credit, Config) -> ok = wait_for_accepts(NumMsgs), ok = detach_link_sync(Sender), + %% Consume from a follower. ok = wait_for_local_member(QType, QName, Config), Filter = consume_from_first(QType), {ok, Receiver} = amqp10_client:attach_receiver_link( From b1169d06ba7dc22059cddc3dfb645ca620e3c132 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 24 Oct 2024 18:09:52 +0200 Subject: [PATCH 5/7] Delete test access_failure This test flakes in CI as described in https://github.com/rabbitmq/rabbitmq-server/issues/12413#issuecomment-2419293869 The test case fails with ``` Node: rabbit_shard2@localhost Case: amqp_system_SUITE:access_failure Reason: {error,{{badmatch,{error,134, "Unhandled exception. System.Exception: expected exception not received at Program.Test.accessFailure(String uri) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 477 at Program.main(String[] argv) in /home/runner/work/rabbitmq-server/rabbitmq-server/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs:line 509\n"}}, [{amqp_system_SUITE,run_dotnet_test,2, [{file,"amqp_system_SUITE.erl"}, {line,257}]}, ``` However, RabbitMQ closes the session as expected due to the missing read permissions to the queue as shown in the RabbitMQ logs: ``` [debug] <0.1321.0> Asked to create a new user 'access_failure', password length in bytes: 24 [info] <0.1321.0> Created user 'access_failure' [debug] <0.1324.0> Asked to set permissions for user 'access_failure' in virtual host '/' to '.*', '^banana.*', '^banana.*' [info] <0.1324.0> Successfully set permissions for user 'access_failure' in virtual host '/' to '.*', '^banana.*', '^banana.*' [info] <0.1333.0> accepting AMQP connection 127.0.0.1:36248 -> 127.0.0.1:25000 [debug] <0.1333.0> User 'access_failure' authenticated successfully by backend rabbit_auth_backend_internal [info] <0.1333.0> Connection from AMQP 1.0 container 'AMQPNetLite-101d7d51': user 'access_failure' authenticated using SASL mechanism PLAIN and granted access to vhost '/' [debug] <0.1333.0> AMQP 1.0 connection.open frame: hostname = 127.0.0.1, extracted vhost = /, idle-time-out = undefined [debug] <0.1333.0> AMQP 1.0 created session process <0.1338.0> for channel number 0 [warning] <0.1338.0> Closing session for connection <0.1333.0>: {'v1_0.error', [warning] <0.1338.0> {symbol, [warning] <0.1338.0> <<"amqp:unauthorized-access">>}, [warning] <0.1338.0> {utf8, [warning] <0.1338.0> <<"read access to queue 'test' in vhost '/' refused for user 'access_failure'">>}, [warning] <0.1338.0> undefined} [debug] <0.1333.0> AMQP 1.0 closed session process <0.1338.0> with channel number 0 [warning] <0.1333.0> closing AMQP connection <0.1333.0> (127.0.0.1:36248 -> 127.0.0.1:25000, duration: '269ms'): [warning] <0.1333.0> client unexpectedly closed TCP connection ``` ``` let receiver = ReceiverLink(ac.Session, "test-receiver", src) ``` uses a null constructur for the onAttached callback. ReceiverLink doesn't seem to block. Given that the exact same authorization error is already tested in test case attach_source_queue of amqp_auth_SUITE, it's safe to delete this F# test. --- deps/rabbit/test/amqp_system_SUITE.erl | 13 ------------- .../fsharp-tests/Program.fs | 17 ----------------- 2 files changed, 30 deletions(-) diff --git a/deps/rabbit/test/amqp_system_SUITE.erl b/deps/rabbit/test/amqp_system_SUITE.erl index 37f9b3ac102d..c7dfb00af4b0 100644 --- a/deps/rabbit/test/amqp_system_SUITE.erl +++ b/deps/rabbit/test/amqp_system_SUITE.erl @@ -38,7 +38,6 @@ groups() -> routing, invalid_routes, auth_failure, - access_failure, access_failure_not_allowed, access_failure_send, streams @@ -217,18 +216,6 @@ invalid_routes(Config) -> auth_failure(Config) -> run(Config, [ {dotnet, "auth_failure"} ]). -access_failure(Config) -> - User = atom_to_binary(?FUNCTION_NAME), - ok = rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), - ok = rabbit_ct_broker_helpers:set_permissions(Config, User, <<"/">>, - <<".*">>, %% configure - <<"^banana.*">>, %% write - <<"^banana.*">> %% read - ), - run(Config, [ {dotnet, "access_failure"} ]), - ok = rabbit_ct_broker_helpers:delete_user(Config, User). - - access_failure_not_allowed(Config) -> User = atom_to_binary(?FUNCTION_NAME), ok = rabbit_ct_broker_helpers:add_user(Config, User, <<"boo">>), diff --git a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs index 453406b84253..aa6a2fd0b713 100755 --- a/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbit/test/amqp_system_SUITE_data/fsharp-tests/Program.fs @@ -466,20 +466,6 @@ module Test = printfn "Exception %A" ex () - let accessFailure uri = - try - let u = Uri uri - let uri = sprintf "amqp://access_failure:boo@%s:%i" u.Host u.Port - use ac = connect uri - let src = "/queues/test" - let receiver = ReceiverLink(ac.Session, "test-receiver", src) - receiver.Close() - failwith "expected exception not received" - with - | :? Amqp.AmqpException as ex -> - printfn "Exception %A" ex - () - let accessFailureNotAllowed uri = try let u = Uri uri @@ -505,9 +491,6 @@ let main argv = | [AsLower "auth_failure"; uri] -> authFailure uri 0 - | [AsLower "access_failure"; uri] -> - accessFailure uri - 0 | [AsLower "access_failure_not_allowed"; uri] -> accessFailureNotAllowed uri 0 From 55a055550871afa03709077eff7301d95aac7b9a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 24 Oct 2024 18:07:04 +0000 Subject: [PATCH 6/7] build(deps): bump org.springframework.boot:spring-boot-starter-parent Bumps [org.springframework.boot:spring-boot-starter-parent](https://github.com/spring-projects/spring-boot) from 3.3.4 to 3.3.5. - [Release notes](https://github.com/spring-projects/spring-boot/releases) - [Commits](https://github.com/spring-projects/spring-boot/compare/v3.3.4...v3.3.5) --- updated-dependencies: - dependency-name: org.springframework.boot:spring-boot-starter-parent dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- .../examples/rabbitmq_auth_backend_spring_boot_kotlin/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot_kotlin/pom.xml b/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot_kotlin/pom.xml index d20891e49dba..67df8a9b4be3 100644 --- a/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot_kotlin/pom.xml +++ b/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot_kotlin/pom.xml @@ -14,7 +14,7 @@ org.springframework.boot spring-boot-starter-parent - 3.3.4 + 3.3.5 From cca22ca577cac7ca6461cf587c2c5fd4e04d556e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 24 Oct 2024 18:07:36 +0000 Subject: [PATCH 7/7] build(deps): bump org.springframework.boot:spring-boot-starter-parent Bumps [org.springframework.boot:spring-boot-starter-parent](https://github.com/spring-projects/spring-boot) from 3.3.4 to 3.3.5. - [Release notes](https://github.com/spring-projects/spring-boot/releases) - [Commits](https://github.com/spring-projects/spring-boot/compare/v3.3.4...v3.3.5) --- updated-dependencies: - dependency-name: org.springframework.boot:spring-boot-starter-parent dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- .../examples/rabbitmq_auth_backend_spring_boot/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot/pom.xml b/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot/pom.xml index 457c10f2b483..b7489fdc98f0 100644 --- a/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot/pom.xml +++ b/deps/rabbitmq_auth_backend_http/examples/rabbitmq_auth_backend_spring_boot/pom.xml @@ -29,7 +29,7 @@ org.springframework.boot spring-boot-starter-parent - 3.3.4 + 3.3.5