diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2021-02-10 06:13:18 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2021-02-10 06:13:18 +0300 |
commit | 927a9ddb52dab71ab5c2592bb6f07bfc90ad4615 (patch) | |
tree | a45e2f89c03be166c315a51a5f21405760db6573 | |
parent | 6c6586346b3133c17b3f66db095a58ae68f21995 (diff) | |
download | rabbitmq-server-git-rabbitmq-server-2798.tar.gz |
Make it possible to specify optional queue arguments for dynamic Shovelsrabbitmq-server-2798
when shovels declare queues, it is currently not possible to declare
a quorum queue.
Closes #2798.
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 111 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_parameter_validation.erl | 15 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl | 48 | ||||
-rw-r--r-- | deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 15 |
5 files changed, 152 insertions, 39 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index a8422b3da3..6a5903f679 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -55,7 +55,7 @@ -export([rebalance/3]). -export([collect_info_all/2]). --export([is_policy_applicable/2]). +-export([is_policy_applicable/2, declare_args/0]). -export([is_server_named_allowed/1]). -export([check_max_age/1]). @@ -794,16 +794,29 @@ check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} - end. + end; +check_int_arg(Val, _) when is_integer(Val) -> + ok; +check_int_arg(_Val, _) -> + {error, {unacceptable_type, "expected integer"}}. check_bool_arg({bool, _}, _) -> ok; -check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. +check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}; +check_bool_arg(true, _) -> ok; +check_bool_arg(false, _) -> ok; +check_bool_arg(_Val, _) -> {error, {unacceptable_type, "expected boolean"}}. check_non_neg_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val >= 0 -> ok; ok -> {error, {value_negative, Val}}; Error -> Error + end; +check_non_neg_int_arg(Val, Args) -> + case check_int_arg(Val, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error end. check_expires_arg({Type, Val}, Args) -> @@ -811,12 +824,23 @@ check_expires_arg({Type, Val}, Args) -> ok when Val == 0 -> {error, {value_zero, Val}}; ok -> rabbit_misc:check_expiry(Val); Error -> Error + end; +check_expires_arg(Val, Args) -> + case check_int_arg(Val, Args) of + ok when Val == 0 -> {error, {value_zero, Val}}; + ok -> rabbit_misc:check_expiry(Val); + Error -> Error end. check_message_ttl_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok -> rabbit_misc:check_expiry(Val); Error -> Error + end; +check_message_ttl_arg(Val, Args) -> + case check_int_arg(Val, Args) of + ok -> rabbit_misc:check_expiry(Val); + Error -> Error end. check_max_priority_arg({Type, Val}, Args) -> @@ -824,19 +848,30 @@ check_max_priority_arg({Type, Val}, Args) -> ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok; ok -> {error, {max_value_exceeded, Val}}; Error -> Error + end; +check_max_priority_arg(Val, Args) -> + case check_non_neg_int_arg(Val, Args) of + ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok; + ok -> {error, {max_value_exceeded, Val}}; + Error -> Error end. check_single_active_consumer_arg({Type, Val}, Args) -> - case check_bool_arg({Type, Val}, Args) of - ok -> ok; - Error -> Error - end. + check_bool_arg({Type, Val}, Args); +check_single_active_consumer_arg(Val, Args) -> + check_bool_arg(Val, Args). check_initial_cluster_size_arg({Type, Val}, Args) -> case check_non_neg_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; ok -> ok; Error -> Error + end; +check_initial_cluster_size_arg(Val, Args) -> + case check_non_neg_int_arg(Val, Args) of + ok when Val == 0 -> {error, {value_zero, Val}}; + ok -> ok; + Error -> Error end. check_max_age_arg({longstr, Val}, _Args) -> @@ -884,7 +919,9 @@ unit_value_in_ms("s") -> %% Note that the validity of x-dead-letter-exchange is already verified %% by rabbit_channel's queue.declare handler. check_dlxname_arg({longstr, _}, _) -> ok; -check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. +check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}; +check_dlxname_arg(Val, _) when is_list(Val) or is_binary(Val) -> ok; +check_dlxname_arg(_Val, _) -> {error, {unacceptable_type, "expected a string (valid exchange name)"}}. check_dlxrk_arg({longstr, _}, Args) -> case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of @@ -892,7 +929,14 @@ check_dlxrk_arg({longstr, _}, Args) -> _ -> ok end; check_dlxrk_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. + {error, {unacceptable_type, Type}}; +check_dlxrk_arg(Val, Args) when is_binary(Val) -> + case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of + undefined -> {error, routing_key_but_no_dlx_defined}; + _ -> ok + end; +check_dlxrk_arg(_Val, _Args) -> + {error, {unacceptable_type, "expected a string"}}. check_overflow({longstr, Val}, _Args) -> case lists:member(Val, [<<"drop-head">>, @@ -902,33 +946,64 @@ check_overflow({longstr, Val}, _Args) -> false -> {error, invalid_overflow} end; check_overflow({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. + {error, {unacceptable_type, Type}}; +check_overflow(Val, _Args) -> + case lists:member(Val, [<<"drop-head">>, + <<"reject-publish">>, + <<"reject-publish-dlx">>]) of + true -> ok; + false -> {error, invalid_overflow} + end; +check_overflow(_Val, _Args) -> + {error, invalid_overflow}. +-define(KNOWN_LEADER_LOCATORS, [<<"client-local">>, <<"random">>, <<"least-leaders">>]). check_queue_leader_locator_arg({longstr, Val}, _Args) -> - case lists:member(Val, [<<"client-local">>, - <<"random">>, - <<"least-leaders">>]) of + case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of true -> ok; false -> {error, invalid_queue_locator_arg} end; check_queue_leader_locator_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. + {error, {unacceptable_type, Type}}; +check_queue_leader_locator_arg(Val, _Args) when is_binary(Val) -> + case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of + true -> ok; + false -> {error, invalid_queue_locator_arg} + end; +check_queue_leader_locator_arg(_Val, _Args) -> + {error, invalid_queue_locator_arg}. +-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]). check_queue_mode({longstr, Val}, _Args) -> - case lists:member(Val, [<<"default">>, <<"lazy">>]) of + case lists:member(Val, ?KNOWN_QUEUE_MODES) of true -> ok; false -> {error, invalid_queue_mode} end; check_queue_mode({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. + {error, {unacceptable_type, Type}}; +check_queue_mode(Val, _Args) when is_binary(Val) -> + case lists:member(Val, ?KNOWN_QUEUE_MODES) of + true -> ok; + false -> {error, invalid_queue_mode} + end; +check_queue_mode(_Val, _Args) -> + {error, invalid_queue_mode}. +-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]). check_queue_type({longstr, Val}, _Args) -> - case lists:member(Val, [<<"classic">>, <<"quorum">>, <<"stream">>]) of + case lists:member(Val, ?KNOWN_QUEUE_TYPES) of true -> ok; false -> {error, invalid_queue_type} end; check_queue_type({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. + {error, {unacceptable_type, Type}}; +check_queue_type(Val, _Args) when is_binary(Val) -> + case lists:member(Val, ?KNOWN_QUEUE_TYPES) of + true -> ok; + false -> {error, invalid_queue_type} + end; +check_queue_type(_Val, _Args) -> + {error, invalid_queue_type}. -spec list() -> [amqqueue:amqqueue()]. diff --git a/deps/rabbit/src/rabbit_parameter_validation.erl b/deps/rabbit/src/rabbit_parameter_validation.erl index 0fc274a06b..6c0852476a 100644 --- a/deps/rabbit/src/rabbit_parameter_validation.erl +++ b/deps/rabbit/src/rabbit_parameter_validation.erl @@ -50,7 +50,20 @@ regex(Name, Term) -> proplist(Name, Constraints, Term) when is_list(Term) -> {Results, Remainder} = lists:foldl( - fun ({Key, Fun, Needed}, {Results0, Term0}) -> + %% if the optional/mandatory flag is not provided in a constraint tuple, + %% assume 'optional' + fun ({Key, Fun}, {Results0, Term0}) -> + case lists:keytake(Key, 1, Term0) of + {value, {Key, Value}, Term1} -> + {[Fun(Key, Value) | Results0], + Term1}; + {value, {Key, Type, Value}, Term1} -> + {[Fun(Key, Type, Value) | Results0], + Term1}; + false -> + {Results0, Term0} + end; + ({Key, Fun, Needed}, {Results0, Term0}) -> case {lists:keytake(Key, 1, Term0), Needed} of {{value, {Key, Value}, Term1}, _} -> {[Fun(Key, Value) | Results0], diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index 8de6a3ecb1..a9be15914f 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -48,7 +48,7 @@ ack_mode => ack_mode(), atom() => term()}. --export_type([state/0, source_config/0, dest_config/0, uri/0]). +-export_type([state/0, source_config/0, dest_config/0, uri/0, tag/0]). -callback parse(binary(), {source | destination, Conf :: proplists:proplist()}) -> source_config() | dest_config(). diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl index c199e30859..fae76be256 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl @@ -116,12 +116,13 @@ amqp10_src_validation(_Def, User) -> amqp091_src_validation(_Def, User) -> [ - {<<"src-uri">>, validate_uri_fun(User), mandatory}, - {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2,optional}, - {<<"src-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional}, - {<<"src-queue">>, fun rabbit_parameter_validation:binary/2,optional}, - {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2,optional}, - {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2,optional}, + {<<"src-uri">>, validate_uri_fun(User), mandatory}, + {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"src-queue-args">>, fun validate_queue_args/2, optional}, + {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, + {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, %% a deprecated pre-3.7 setting {<<"delete-after">>, fun validate_delete_after/2, optional}, %% currently used multi-protocol friend name, introduced in 3.7 @@ -151,6 +152,7 @@ amqp091_dest_validation(_Def, User) -> {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional}, {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional}, {<<"dest-queue">>, fun rabbit_parameter_validation:binary/2,optional}, + {<<"dest-queue-args">>, fun validate_queue_args/2, optional}, {<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, {<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional}, {<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional}, @@ -206,6 +208,11 @@ validate_delete_after(Name, Term) -> {error, "~s should be number, \"never\" or \"queue-length\", actually was " "~p", [Name, Term]}. +validate_queue_args(Name, Term0) -> + Term = rabbit_data_coercion:to_proplist(Term0), + + rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term). + validate_amqp10_map(Name, Terms0) -> Terms = rabbit_data_coercion:to_proplist(Terms0), Str = fun rabbit_parameter_validation:binary/2, @@ -292,14 +299,15 @@ parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) -> }. parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) -> - DestURIs = get_uris(<<"dest-uri">>, Def), - DestX = pget(<<"dest-exchange">>, Def, none), - DestXKey = pget(<<"dest-exchange-key">>, Def, none), - DestQ = pget(<<"dest-queue">>, Def, none), + DestURIs = get_uris(<<"dest-uri">>, Def), + DestX = pget(<<"dest-exchange">>, Def, none), + DestXKey = pget(<<"dest-exchange-key">>, Def, none), + DestQ = pget(<<"dest-queue">>, Def, none), + DestQArgs = pget(<<"dest-queue-args">>, Def, #{}), DestDeclFun = fun (Conn, _Ch) -> case DestQ of none -> ok; - _ -> ensure_queue(Conn, DestQ) + _ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs)) end end, {X, Key} = case DestQ of @@ -371,10 +379,11 @@ parse_amqp10_source(Def) -> prefetch_count => PrefetchCount}, Headers}. parse_amqp091_source(Def) -> - SrcURIs = get_uris(<<"src-uri">>, Def), - SrcX = pget(<<"src-exchange">>,Def, none), - SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1] - SrcQ = pget(<<"src-queue">>, Def, none), + SrcURIs = get_uris(<<"src-uri">>, Def), + SrcX = pget(<<"src-exchange">>,Def, none), + SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1] + SrcQ = pget(<<"src-queue">>, Def, none), + SrcQArgs = pget(<<"dest-queue-args">>, Def, #{}), {SrcDeclFun, Queue, DestHeaders} = case SrcQ of none -> {fun (_Conn, Ch) -> @@ -385,7 +394,7 @@ parse_amqp091_source(Def) -> end, <<>>, [{<<"src-exchange">>, SrcX}, {<<"src-exchange-key">>, SrcXKey}]}; _ -> {fun (Conn, _Ch) -> - ensure_queue(Conn, SrcQ) + ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs)) end, SrcQ, [{<<"src-queue">>, SrcQ}]} end, DeleteAfter = pget(<<"src-delete-after">>, Def, @@ -416,15 +425,16 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm; translate_ack_mode(<<"on-publish">>) -> on_publish; translate_ack_mode(<<"no-ack">>) -> no_ack. -ensure_queue(Conn, Queue) -> +ensure_queue(Conn, Queue, XArgs) -> {ok, Ch} = amqp_connection:open_channel(Conn), try amqp_channel:call(Ch, #'queue.declare'{queue = Queue, passive = true}) catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> {ok, Ch2} = amqp_connection:open_channel(Conn), - amqp_channel:call(Ch2, #'queue.declare'{queue = Queue, - durable = true}), + amqp_channel:call(Ch2, #'queue.declare'{queue = Queue, + durable = true, + arguments = XArgs}), catch amqp_channel:close(Ch2) after diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index d542b3a547..fa9144a8c6 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -21,6 +21,7 @@ groups() -> [ {non_parallel_tests, [], [ simple, + quorum_queues, set_properties_using_proplist, set_properties_using_map, set_empty_properties_using_proplist, @@ -80,6 +81,20 @@ simple(Config) -> publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) end). +quorum_queues(Config) -> + with_ch(Config, + fun (Ch) -> + shovel_test_utils:set_param( + Config, + <<"test">>, [ + {<<"src-queue">>, <<"src">>}, + {<<"dest-queue">>, <<"dest">>}, + {<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}, + {<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}} + ]), + publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) + end). + set_properties_using_map(Config) -> with_ch(Config, fun (Ch) -> |