summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-16 11:25:08 +0200
committerGitHub <noreply@github.com>2020-10-16 11:25:08 +0200
commitac27c42e25992242484497afc77f257223edf885 (patch)
tree0cddc0a626bb20630724b0b99e48fcb8c67693fe
parent5973092f4b6ce12f90b7db531b61e018676afdca (diff)
parent6ebdbf4191e43a96bf5e3a6382985918ac211744 (diff)
downloadrabbitmq-server-git-ac27c42e25992242484497afc77f257223edf885.tar.gz
Merge pull request #2466 from rabbitmq/rabbitmq-server-2296
Prevent quorum and stream queues from being server-named
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_classic_queue.erl32
-rw-r--r--src/rabbit_queue_type.erl36
-rw-r--r--src/rabbit_queue_type_util.erl13
-rw-r--r--src/rabbit_quorum_queue.erl35
-rw-r--r--src/rabbit_stream_queue.erl24
-rw-r--r--test/quorum_queue_SUITE.erl9
-rw-r--r--test/rabbit_stream_queue_SUITE.erl9
9 files changed, 106 insertions, 70 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9e66445443..fd7682fdeb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -56,8 +56,10 @@
-export([collect_info_all/2]).
-export([is_policy_applicable/2]).
+-export([is_server_named_allowed/1]).
-export([check_max_age/1]).
+-export([get_queue_type/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -366,6 +368,10 @@ is_policy_applicable(QName, Policy) ->
true
end.
+is_server_named_allowed(Args) ->
+ Type = get_queue_type(Args),
+ rabbit_queue_type:is_server_named_allowed(Type).
+
-spec lookup
(name()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b3f70d87d7..fb9a428a0e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2400,8 +2400,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
Durable = DurableDeclare andalso not ExclusiveDeclare,
ActualNameBin = case StrippedQueueNameBin of
- <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
- "amq.gen");
+ <<>> ->
+ case rabbit_amqqueue:is_server_named_allowed(Args) of
+ true ->
+ rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen");
+ false ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "Cannot declare a server-named queue for type ~p",
+ [rabbit_amqqueue:get_queue_type(Args)])
+ end;
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index 5ab32b8632..be0877c87d 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -38,7 +38,7 @@
dequeue/4,
info/2,
state_info/1,
- is_policy_applicable/2
+ capabilities/0
]).
-export([delete_crashed/1,
@@ -438,19 +438,23 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
--spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
-is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
- <<"dead-letter-routing-key">>, <<"max-length">>,
- <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
- <<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
- <<"single-active-consumer">>, <<"delivery-limit">>,
- <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
- <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
- <<"queue-master-locator">>],
- lists:all(fun({P, _}) ->
- lists:member(P, Applicable)
- end, Policy).
+capabilities() ->
+ #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
+ <<"dead-letter-routing-key">>, <<"max-length">>,
+ <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
+ <<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
+ <<"single-active-consumer">>, <<"delivery-limit">>,
+ <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
+ <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
+ <<"queue-master-locator">>],
+ queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
+ <<"x-dead-letter-routing-key">>, <<"x-max-length">>,
+ <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
+ <<"x-max-in-memory-bytes">>, <<"x-max-priority">>,
+ <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>,
+ <<"x-queue-type">>],
+ consumer_arguments => [<<"x-cancel-on-ha-failover">>],
+ server_named => true}.
reject_seq_no(SeqNo, U0) ->
reject_seq_no(SeqNo, U0, []).
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 40028c8241..8424ee5d15 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -31,7 +31,8 @@
credit/5,
dequeue/5,
fold_state/3,
- is_policy_applicable/2
+ is_policy_applicable/2,
+ is_server_named_allowed/1
]).
%% temporary
@@ -188,8 +189,8 @@
-callback stat(amqqueue:amqqueue()) ->
{'ok', non_neg_integer(), non_neg_integer()}.
--callback is_policy_applicable(amqqueue:amqqueue(), any()) ->
- boolean().
+-callback capabilities() ->
+ #{atom() := term()}.
%% TODO: this should be controlled by a registry that is populated on boot
discover(<<"quorum">>) ->
@@ -212,6 +213,9 @@ is_enabled(Type) ->
rabbit_types:channel_exit().
declare(Q, Node) ->
Mod = amqqueue:get_type(Q),
+ Capabilities = Mod:capabilities(),
+ ValidQueueArgs = maps:get(queue_arguments, Capabilities, []),
+ check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs),
Mod:declare(Q, Node).
-spec delete(amqqueue:amqqueue(), boolean(),
@@ -291,8 +295,16 @@ i_down(K, _Q, _DownReason) ->
is_policy_applicable(Q, Policy) ->
Mod = amqqueue:get_type(Q),
- Mod:is_policy_applicable(Q, Policy).
-
+ Capabilities = Mod:capabilities(),
+ Applicable = maps:get(policies, Capabilities, []),
+ lists:all(fun({P, _}) ->
+ lists:member(P, Applicable)
+ end, Policy).
+
+is_server_named_allowed(Type) ->
+ Capabilities = Type:capabilities(),
+ maps:get(server_named, Capabilities, false).
+
-spec init() -> state().
init() ->
#?STATE{}.
@@ -316,6 +328,9 @@ new(Q, State) when ?is_amqqueue(Q) ->
consume(Q, Spec, State) ->
#ctx{state = State0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
+ Capabilities = Mod:capabilities(),
+ ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []),
+ check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs),
case Mod:consume(Q, Spec, State0) of
{ok, CtxState, Actions} ->
return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
@@ -556,3 +571,14 @@ return_ok(State0, Actions0) ->
{S, [Act | A0]}
end, {State0, []}, Actions0),
{ok, State, lists:reverse(Actions)}.
+
+
+check_invalid_arguments(QueueName, Args, Keys) ->
+ [case lists:member(Arg, Keys) of
+ true -> ok;
+ false -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "invalid arg '~s' for ~s",
+ [Arg, rabbit_misc:rs(QueueName)])
+ end || {Arg, _, _} <- Args],
+ ok.
diff --git a/src/rabbit_queue_type_util.erl b/src/rabbit_queue_type_util.erl
index 0ff7fb312d..68b21749c7 100644
--- a/src/rabbit_queue_type_util.erl
+++ b/src/rabbit_queue_type_util.erl
@@ -16,8 +16,7 @@
-module(rabbit_queue_type_util).
--export([check_invalid_arguments/3,
- args_policy_lookup/3,
+-export([args_policy_lookup/3,
qname_to_internal_name/1,
check_auto_delete/1,
check_exclusive/1,
@@ -26,16 +25,6 @@
-include("rabbit.hrl").
-include("amqqueue.hrl").
-check_invalid_arguments(QueueName, Args, Keys) ->
- [case rabbit_misc:table_lookup(Args, Key) of
- undefined -> ok;
- _TypeVal -> rabbit_misc:protocol_error(
- precondition_failed,
- "invalid arg '~s' for ~s",
- [Key, rabbit_misc:rs(QueueName)])
- end || Key <- Keys],
- ok.
-
args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) ->
Args = amqqueue:get_arguments(Q),
AName = <<"x-", Name/binary>>,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 1f333bdae1..c0b1aa0965 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -43,7 +43,7 @@
-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
filter_quorum_critical/1, filter_quorum_critical/2,
all_replica_states/0]).
--export([is_policy_applicable/2]).
+-export([capabilities/0]).
-export([repair_amqqueue_nodes/1,
repair_amqqueue_nodes/2
]).
@@ -134,7 +134,6 @@ declare(Q, _Node) when ?amqqueue_is_quorum(Q) ->
Arguments = amqqueue:get_arguments(Q),
Opts = amqqueue:get_options(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
- check_invalid_arguments(QName, Arguments),
rabbit_queue_type_util:check_auto_delete(Q),
rabbit_queue_type_util:check_exclusive(Q),
rabbit_queue_type_util:check_non_durable(Q),
@@ -331,20 +330,18 @@ filter_quorum_critical(Queues, ReplicaStates) ->
length(AllUp) =< MinQuorum
end, Queues).
--spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
-is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length">>,
- <<"max-length-bytes">>,
- <<"overflow">>,
- <<"expires">>,
- <<"max-in-memory-length">>,
- <<"max-in-memory-bytes">>,
- <<"delivery-limit">>,
- <<"dead-letter-exchange">>,
- <<"dead-letter-routing-key">>],
- lists:all(fun({P, _}) ->
- lists:member(P, Applicable)
- end, Policy).
+capabilities() ->
+ #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>,
+ <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
+ <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>],
+ queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>,
+ <<"x-dead-letter-routing-key">>, <<"x-max-length">>,
+ <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
+ <<"x-max-in-memory-bytes">>, <<"x-overflow">>,
+ <<"x-single-active-consumer">>, <<"x-queue-type">>,
+ <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>],
+ consumer_arguments => [<<"x-priority">>],
+ server_named => false}.
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
@@ -1416,12 +1413,6 @@ quorum_ctag(Other) ->
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
-check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-message-ttl">>,
- <<"x-max-priority">>,
- <<"x-queue-mode">>],
- rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys).
-
queue_name(RaFifoState) ->
rabbit_fifo_client:cluster_name(RaFifoState).
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index 394479a8f2..3395f42a59 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -37,7 +37,7 @@
update/2,
state_info/1,
stat/1,
- is_policy_applicable/2]).
+ capabilities/0]).
-export([set_retention_policy/3]).
-export([add_replica/3,
@@ -80,7 +80,6 @@ is_enabled() ->
declare(Q0, Node) when ?amqqueue_is_stream(Q0) ->
Arguments = amqqueue:get_arguments(Q0),
QName = amqqueue:get_name(Q0),
- check_invalid_arguments(QName, Arguments),
rabbit_queue_type_util:check_auto_delete(Q0),
rabbit_queue_type_util:check_exclusive(Q0),
rabbit_queue_type_util:check_non_durable(Q0),
@@ -544,13 +543,6 @@ max_age(Age) ->
max_age(Age1, Age2) ->
min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)).
-check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>,
- <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>,
- <<"x-max-in-memory-length">>, <<"x-max-in-memory-bytes">>,
- <<"x-quorum-initial-group-size">>, <<"x-cancel-on-ha-failover">>],
- rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys).
-
queue_name(#resource{virtual_host = VHost, name = Name}) ->
Timestamp = erlang:integer_to_binary(erlang:system_time()),
osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_",
@@ -658,9 +650,11 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
<<"x-routing-key">> => {utf8, RKey}}, R0),
rabbit_msg_record:to_iodata(R).
--spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean().
-is_policy_applicable(_Q, Policy) ->
- Applicable = [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>],
- lists:all(fun({P, _}) ->
- lists:member(P, Applicable)
- end, Policy).
+capabilities() ->
+ #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>],
+ queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
+ <<"x-max-length">>, <<"x-max-length-bytes">>,
+ <<"x-single-active-consumer">>, <<"x-queue-type">>,
+ <<"x-max-age">>, <<"x-max-segment-size">>],
+ consumer_arguments => [<<"x-stream-offset">>],
+ server_named => false}.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index d428cb0701..97b2235411 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -91,6 +91,7 @@ all_tests() ->
declare_args,
declare_invalid_args,
declare_invalid_properties,
+ declare_server_named,
start_queue,
stop_queue,
restart_queue,
@@ -378,6 +379,14 @@ declare_invalid_args(Config) ->
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 0}])).
+declare_server_named(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ <<"">>, [{<<"x-queue-type">>, longstr, <<"quorum">>}])).
+
start_queue(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index d3dfd60f7c..777384f7ff 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -59,6 +59,7 @@ all_tests() ->
declare_max_age,
declare_invalid_args,
declare_invalid_properties,
+ declare_server_named,
declare_queue,
delete_queue,
publish,
@@ -263,6 +264,14 @@ declare_invalid_args(Config) ->
Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])).
+declare_server_named(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ <<"">>, [{<<"x-queue-type">>, longstr, <<"stream">>}])).
+
declare_queue(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),