Skip to content

Commit

Permalink
Set capabilities parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcialRosales committed Oct 23, 2024
1 parent 64a3a31 commit b00c268
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 8 deletions.
17 changes: 15 additions & 2 deletions deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ amqp10_src_validation(_Def, User) ->
{<<"src-uri">>, validate_uri_fun(User), mandatory},
{<<"src-address">>, fun rabbit_parameter_validation:binary/2, mandatory},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
{<<"src-delete-after">>, fun validate_delete_after/2, optional}
{<<"src-delete-after">>, fun validate_delete_after/2, optional},
{<<"src-capabilities">>, fun validate_link_capabilities/2, optional}
].

amqp091_src_validation(_Def, User) ->
Expand Down Expand Up @@ -168,7 +169,8 @@ amqp10_dest_validation(_Def, User) ->
{<<"dest-application-properties">>, fun validate_amqp10_map/2, optional},
{<<"dest-message-annotations">>, fun validate_amqp10_map/2, optional},
% TODO: restrict to allowed fields
{<<"dest-properties">>, fun validate_amqp10_map/2, optional}
{<<"dest-properties">>, fun validate_amqp10_map/2, optional},
{<<"dest-capabilities">>, fun validate_link_capabilities/2, optional}
].

amqp091_dest_validation(_Def, User) ->
Expand Down Expand Up @@ -208,6 +210,17 @@ validate_uri(Name, Term, User) ->
E -> E
end.

validate_link_capabilities(Capabilities, Term) ->
case rabbit_parameter_validation:list(Capabilities, Term) of
ok -> case [V || C <- Capabilities,
V <- [rabbit_parameter_validation:binary(C, Term)],
element(1, V) =:= error] of
[] -> ok;
[E | _] -> E
end;
E -> E
end.

validate_params_user(#amqp_params_direct{}, none) ->
ok;
validate_params_user(#amqp_params_direct{virtual_host = VHost},
Expand Down
51 changes: 45 additions & 6 deletions deps/rabbitmq_shovel/test/parameters_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ groups() ->
parse_amqp10,
parse_amqp10_minimal,
validate_amqp10,
validate_amqp10_with_a_map
validate_amqp10_with_a_map,
validate_amqp10_with_wrong_capabilities_type
]}
].

Expand Down Expand Up @@ -230,6 +231,7 @@ parse_amqp10(_Config) ->
{<<"src-address">>, <<"a-src-queue">>},
{<<"src-delete-after">>, <<"never">>},
{<<"src-prefetch-count">>, 30},
{<<"src-capabilities">>, []},

{<<"dest-protocol">>, <<"amqp10">>},
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
Expand All @@ -240,7 +242,8 @@ parse_amqp10(_Config) ->
<<"app-prop-value">>}]},
{<<"dest-message-annotations">>, [{<<"some-message-ann">>,
<<"message-ann-value">>}]},
{<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]}
{<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]},
{<<"dest-capabilities">>, [<<"one">>, <<"two">>]}
],
ObfuscatedParams = rabbit_shovel_parameters:obfuscate_uris_in_definition(Params),
?assertMatch(
Expand All @@ -250,7 +253,8 @@ parse_amqp10(_Config) ->
uris := ["amqp://localhost:5672"],
delete_after := never,
prefetch_count := 30,
source_address := <<"a-src-queue">>
source_address := <<"a-src-queue">>,
receiver_capabilities := []
},
dest := #{module := rabbit_amqp10_shovel,
uris := ["amqp://remotehost:5672"],
Expand All @@ -261,7 +265,8 @@ parse_amqp10(_Config) ->
<<"app-prop-value">>},
properties := #{user_id := <<"some-user">>},
add_timestamp_header := true,
add_forward_headers := true
add_forward_headers := true,
sender_capabilities := [<<"one">>, <<"two">>]
}
}},
rabbit_shovel_parameters:parse({"vhost", "my_shovel"}, "my-cluster",
Expand Down Expand Up @@ -327,6 +332,34 @@ validate_amqp10(_Config) ->
[] = validate_ok(Res),
ok.

validate_amqp10_with_wrong_capabilities_type(_Config) ->
Params =
#{
<<"ack-mode">> => <<"on-publish">>,
<<"reconnect-delay">> => 1001,

<<"src-protocol">> => <<"amqp10">>,
<<"src-uri">> => <<"amqp://localhost:5672">>,
<<"src-address">> => <<"a-src-queue">>,
<<"src-delete-after">> => <<"never">>,
<<"src-prefetch-count">> => 30,
<<"src-capabilities">> => <<"one">>,

<<"dest-protocol">> => <<"amqp10">>,
<<"dest-uri">> => <<"amqp://remotehost:5672">>,
<<"dest-address">> => <<"a-dest-queue">>,
<<"dest-add-forward-headers">> => true,
<<"dest-add-timestamp-header">> => true,
<<"dest-application-properties">> => [{<<"some-app-prop">>,
<<"app-prop-value">>}],
<<"dest-message-annotations">> => [{<<"some-message-ann">>, <<"message-ann-value">>}],
<<"dest-properties">> => #{<<"user_id">> => <<"some-user">>}
},
Res = rabbit_shovel_parameters:validate("my-vhost", <<"shovel">>,
"my-shovel", Params, none),
{error, _, [<<"src-capabilities">>, <<"one">>]} = find_first_error(Res).


validate_amqp10_with_a_map(_Config) ->
Params =
#{
Expand All @@ -352,8 +385,7 @@ validate_amqp10_with_a_map(_Config) ->

Res = rabbit_shovel_parameters:validate("my-vhost", <<"shovel">>,
"my-shovel", Params, none),
[] = validate_ok(Res),
ok.
validate_ok(Res).

validate_ok([ok | T]) ->
validate_ok(T);
Expand All @@ -362,3 +394,10 @@ validate_ok([[_|_] = L | T]) ->
validate_ok([]) -> [];
validate_ok(X) ->
exit({not_ok, X}).
find_first_error([]) ->
undefined;
find_first_error([V | T]) ->
case V of
{error, _, _} = E -> E;
_ -> find_first_error(T)
end.

0 comments on commit b00c268

Please sign in to comment.