summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl111
-rw-r--r--deps/rabbit/src/rabbit_parameter_validation.erl15
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl2
-rw-r--r--deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl48
-rw-r--r--deps/rabbitmq_shovel/test/dynamic_SUITE.erl15
5 files changed, 152 insertions, 39 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index a8422b3da3..6a5903f679 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -55,7 +55,7 @@
-export([rebalance/3]).
-export([collect_info_all/2]).
--export([is_policy_applicable/2]).
+-export([is_policy_applicable/2, declare_args/0]).
-export([is_server_named_allowed/1]).
-export([check_max_age/1]).
@@ -794,16 +794,29 @@ check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
- end.
+ end;
+check_int_arg(Val, _) when is_integer(Val) ->
+ ok;
+check_int_arg(_Val, _) ->
+ {error, {unacceptable_type, "expected integer"}}.
check_bool_arg({bool, _}, _) -> ok;
-check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
+check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}};
+check_bool_arg(true, _) -> ok;
+check_bool_arg(false, _) -> ok;
+check_bool_arg(_Val, _) -> {error, {unacceptable_type, "expected boolean"}}.
check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
Error -> Error
+ end;
+check_non_neg_int_arg(Val, Args) ->
+ case check_int_arg(Val, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
end.
check_expires_arg({Type, Val}, Args) ->
@@ -811,12 +824,23 @@ check_expires_arg({Type, Val}, Args) ->
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
+ end;
+check_expires_arg(Val, Args) ->
+ case check_int_arg(Val, Args) of
+ ok when Val == 0 -> {error, {value_zero, Val}};
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
check_message_ttl_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
+ end;
+check_message_ttl_arg(Val, Args) ->
+ case check_int_arg(Val, Args) of
+ ok -> rabbit_misc:check_expiry(Val);
+ Error -> Error
end.
check_max_priority_arg({Type, Val}, Args) ->
@@ -824,19 +848,30 @@ check_max_priority_arg({Type, Val}, Args) ->
ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
ok -> {error, {max_value_exceeded, Val}};
Error -> Error
+ end;
+check_max_priority_arg(Val, Args) ->
+ case check_non_neg_int_arg(Val, Args) of
+ ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
+ ok -> {error, {max_value_exceeded, Val}};
+ Error -> Error
end.
check_single_active_consumer_arg({Type, Val}, Args) ->
- case check_bool_arg({Type, Val}, Args) of
- ok -> ok;
- Error -> Error
- end.
+ check_bool_arg({Type, Val}, Args);
+check_single_active_consumer_arg(Val, Args) ->
+ check_bool_arg(Val, Args).
check_initial_cluster_size_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> ok;
Error -> Error
+ end;
+check_initial_cluster_size_arg(Val, Args) ->
+ case check_non_neg_int_arg(Val, Args) of
+ ok when Val == 0 -> {error, {value_zero, Val}};
+ ok -> ok;
+ Error -> Error
end.
check_max_age_arg({longstr, Val}, _Args) ->
@@ -884,7 +919,9 @@ unit_value_in_ms("s") ->
%% Note that the validity of x-dead-letter-exchange is already verified
%% by rabbit_channel's queue.declare handler.
check_dlxname_arg({longstr, _}, _) -> ok;
-check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
+check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}};
+check_dlxname_arg(Val, _) when is_list(Val) or is_binary(Val) -> ok;
+check_dlxname_arg(_Val, _) -> {error, {unacceptable_type, "expected a string (valid exchange name)"}}.
check_dlxrk_arg({longstr, _}, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
@@ -892,7 +929,14 @@ check_dlxrk_arg({longstr, _}, Args) ->
_ -> ok
end;
check_dlxrk_arg({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
+ {error, {unacceptable_type, Type}};
+check_dlxrk_arg(Val, Args) when is_binary(Val) ->
+ case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
+ undefined -> {error, routing_key_but_no_dlx_defined};
+ _ -> ok
+ end;
+check_dlxrk_arg(_Val, _Args) ->
+ {error, {unacceptable_type, "expected a string"}}.
check_overflow({longstr, Val}, _Args) ->
case lists:member(Val, [<<"drop-head">>,
@@ -902,33 +946,64 @@ check_overflow({longstr, Val}, _Args) ->
false -> {error, invalid_overflow}
end;
check_overflow({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
+ {error, {unacceptable_type, Type}};
+check_overflow(Val, _Args) ->
+ case lists:member(Val, [<<"drop-head">>,
+ <<"reject-publish">>,
+ <<"reject-publish-dlx">>]) of
+ true -> ok;
+ false -> {error, invalid_overflow}
+ end;
+check_overflow(_Val, _Args) ->
+ {error, invalid_overflow}.
+-define(KNOWN_LEADER_LOCATORS, [<<"client-local">>, <<"random">>, <<"least-leaders">>]).
check_queue_leader_locator_arg({longstr, Val}, _Args) ->
- case lists:member(Val, [<<"client-local">>,
- <<"random">>,
- <<"least-leaders">>]) of
+ case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of
true -> ok;
false -> {error, invalid_queue_locator_arg}
end;
check_queue_leader_locator_arg({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
+ {error, {unacceptable_type, Type}};
+check_queue_leader_locator_arg(Val, _Args) when is_binary(Val) ->
+ case lists:member(Val, ?KNOWN_LEADER_LOCATORS) of
+ true -> ok;
+ false -> {error, invalid_queue_locator_arg}
+ end;
+check_queue_leader_locator_arg(_Val, _Args) ->
+ {error, invalid_queue_locator_arg}.
+-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]).
check_queue_mode({longstr, Val}, _Args) ->
- case lists:member(Val, [<<"default">>, <<"lazy">>]) of
+ case lists:member(Val, ?KNOWN_QUEUE_MODES) of
true -> ok;
false -> {error, invalid_queue_mode}
end;
check_queue_mode({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
+ {error, {unacceptable_type, Type}};
+check_queue_mode(Val, _Args) when is_binary(Val) ->
+ case lists:member(Val, ?KNOWN_QUEUE_MODES) of
+ true -> ok;
+ false -> {error, invalid_queue_mode}
+ end;
+check_queue_mode(_Val, _Args) ->
+ {error, invalid_queue_mode}.
+-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>, <<"stream">>]).
check_queue_type({longstr, Val}, _Args) ->
- case lists:member(Val, [<<"classic">>, <<"quorum">>, <<"stream">>]) of
+ case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
true -> ok;
false -> {error, invalid_queue_type}
end;
check_queue_type({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
+ {error, {unacceptable_type, Type}};
+check_queue_type(Val, _Args) when is_binary(Val) ->
+ case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
+ true -> ok;
+ false -> {error, invalid_queue_type}
+ end;
+check_queue_type(_Val, _Args) ->
+ {error, invalid_queue_type}.
-spec list() -> [amqqueue:amqqueue()].
diff --git a/deps/rabbit/src/rabbit_parameter_validation.erl b/deps/rabbit/src/rabbit_parameter_validation.erl
index 0fc274a06b..6c0852476a 100644
--- a/deps/rabbit/src/rabbit_parameter_validation.erl
+++ b/deps/rabbit/src/rabbit_parameter_validation.erl
@@ -50,7 +50,20 @@ regex(Name, Term) ->
proplist(Name, Constraints, Term) when is_list(Term) ->
{Results, Remainder}
= lists:foldl(
- fun ({Key, Fun, Needed}, {Results0, Term0}) ->
+ %% if the optional/mandatory flag is not provided in a constraint tuple,
+ %% assume 'optional'
+ fun ({Key, Fun}, {Results0, Term0}) ->
+ case lists:keytake(Key, 1, Term0) of
+ {value, {Key, Value}, Term1} ->
+ {[Fun(Key, Value) | Results0],
+ Term1};
+ {value, {Key, Type, Value}, Term1} ->
+ {[Fun(Key, Type, Value) | Results0],
+ Term1};
+ false ->
+ {Results0, Term0}
+ end;
+ ({Key, Fun, Needed}, {Results0, Term0}) ->
case {lists:keytake(Key, 1, Term0), Needed} of
{{value, {Key, Value}, Term1}, _} ->
{[Fun(Key, Value) | Results0],
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
index 8de6a3ecb1..a9be15914f 100644
--- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
@@ -48,7 +48,7 @@
ack_mode => ack_mode(),
atom() => term()}.
--export_type([state/0, source_config/0, dest_config/0, uri/0]).
+-export_type([state/0, source_config/0, dest_config/0, uri/0, tag/0]).
-callback parse(binary(), {source | destination, Conf :: proplists:proplist()}) ->
source_config() | dest_config().
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
index c199e30859..fae76be256 100644
--- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
@@ -116,12 +116,13 @@ amqp10_src_validation(_Def, User) ->
amqp091_src_validation(_Def, User) ->
[
- {<<"src-uri">>, validate_uri_fun(User), mandatory},
- {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
- {<<"src-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
- {<<"src-queue">>, fun rabbit_parameter_validation:binary/2,optional},
- {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
- {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
+ {<<"src-uri">>, validate_uri_fun(User), mandatory},
+ {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"src-queue-args">>, fun validate_queue_args/2, optional},
+ {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
+ {<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
%% a deprecated pre-3.7 setting
{<<"delete-after">>, fun validate_delete_after/2, optional},
%% currently used multi-protocol friend name, introduced in 3.7
@@ -151,6 +152,7 @@ amqp091_dest_validation(_Def, User) ->
{<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-queue">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"dest-queue-args">>, fun validate_queue_args/2, optional},
{<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
@@ -206,6 +208,11 @@ validate_delete_after(Name, Term) ->
{error, "~s should be number, \"never\" or \"queue-length\", actually was "
"~p", [Name, Term]}.
+validate_queue_args(Name, Term0) ->
+ Term = rabbit_data_coercion:to_proplist(Term0),
+
+ rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term).
+
validate_amqp10_map(Name, Terms0) ->
Terms = rabbit_data_coercion:to_proplist(Terms0),
Str = fun rabbit_parameter_validation:binary/2,
@@ -292,14 +299,15 @@ parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) ->
}.
parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
- DestURIs = get_uris(<<"dest-uri">>, Def),
- DestX = pget(<<"dest-exchange">>, Def, none),
- DestXKey = pget(<<"dest-exchange-key">>, Def, none),
- DestQ = pget(<<"dest-queue">>, Def, none),
+ DestURIs = get_uris(<<"dest-uri">>, Def),
+ DestX = pget(<<"dest-exchange">>, Def, none),
+ DestXKey = pget(<<"dest-exchange-key">>, Def, none),
+ DestQ = pget(<<"dest-queue">>, Def, none),
+ DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
DestDeclFun = fun (Conn, _Ch) ->
case DestQ of
none -> ok;
- _ -> ensure_queue(Conn, DestQ)
+ _ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
end
end,
{X, Key} = case DestQ of
@@ -371,10 +379,11 @@ parse_amqp10_source(Def) ->
prefetch_count => PrefetchCount}, Headers}.
parse_amqp091_source(Def) ->
- SrcURIs = get_uris(<<"src-uri">>, Def),
- SrcX = pget(<<"src-exchange">>,Def, none),
- SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
- SrcQ = pget(<<"src-queue">>, Def, none),
+ SrcURIs = get_uris(<<"src-uri">>, Def),
+ SrcX = pget(<<"src-exchange">>,Def, none),
+ SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
+ SrcQ = pget(<<"src-queue">>, Def, none),
+ SrcQArgs = pget(<<"dest-queue-args">>, Def, #{}),
{SrcDeclFun, Queue, DestHeaders} =
case SrcQ of
none -> {fun (_Conn, Ch) ->
@@ -385,7 +394,7 @@ parse_amqp091_source(Def) ->
end, <<>>, [{<<"src-exchange">>, SrcX},
{<<"src-exchange-key">>, SrcXKey}]};
_ -> {fun (Conn, _Ch) ->
- ensure_queue(Conn, SrcQ)
+ ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs))
end, SrcQ, [{<<"src-queue">>, SrcQ}]}
end,
DeleteAfter = pget(<<"src-delete-after">>, Def,
@@ -416,15 +425,16 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm;
translate_ack_mode(<<"on-publish">>) -> on_publish;
translate_ack_mode(<<"no-ack">>) -> no_ack.
-ensure_queue(Conn, Queue) ->
+ensure_queue(Conn, Queue, XArgs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
passive = true})
catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
{ok, Ch2} = amqp_connection:open_channel(Conn),
- amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
- durable = true}),
+ amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
+ durable = true,
+ arguments = XArgs}),
catch amqp_channel:close(Ch2)
after
diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
index d542b3a547..fa9144a8c6 100644
--- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
+++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl
@@ -21,6 +21,7 @@ groups() ->
[
{non_parallel_tests, [], [
simple,
+ quorum_queues,
set_properties_using_proplist,
set_properties_using_map,
set_empty_properties_using_proplist,
@@ -80,6 +81,20 @@ simple(Config) ->
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).
+quorum_queues(Config) ->
+ with_ch(Config,
+ fun (Ch) ->
+ shovel_test_utils:set_param(
+ Config,
+ <<"test">>, [
+ {<<"src-queue">>, <<"src">>},
+ {<<"dest-queue">>, <<"dest">>},
+ {<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}},
+ {<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}
+ ]),
+ publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
+ end).
+
set_properties_using_map(Config) ->
with_ch(Config,
fun (Ch) ->