diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-16 11:25:08 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-16 11:25:08 +0200 |
| commit | ac27c42e25992242484497afc77f257223edf885 (patch) | |
| tree | 0cddc0a626bb20630724b0b99e48fcb8c67693fe | |
| parent | 5973092f4b6ce12f90b7db531b61e018676afdca (diff) | |
| parent | 6ebdbf4191e43a96bf5e3a6382985918ac211744 (diff) | |
| download | rabbitmq-server-git-ac27c42e25992242484497afc77f257223edf885.tar.gz | |
Merge pull request #2466 from rabbitmq/rabbitmq-server-2296
Prevent quorum and stream queues from being server-named
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_classic_queue.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_queue_type.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_queue_type_util.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_stream_queue.erl | 24 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 9 |
9 files changed, 106 insertions, 70 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9e66445443..fd7682fdeb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -56,8 +56,10 @@ -export([collect_info_all/2]). -export([is_policy_applicable/2]). +-export([is_server_named_allowed/1]). -export([check_max_age/1]). +-export([get_queue_type/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -366,6 +368,10 @@ is_policy_applicable(QName, Policy) -> true end. +is_server_named_allowed(Args) -> + Type = get_queue_type(Args), + rabbit_queue_type:is_server_named_allowed(Type). + -spec lookup (name()) -> rabbit_types:ok(amqqueue:amqqueue()) | diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b3f70d87d7..fb9a428a0e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2400,8 +2400,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin, StrippedQueueNameBin = strip_cr_lf(QueueNameBin), Durable = DurableDeclare andalso not ExclusiveDeclare, ActualNameBin = case StrippedQueueNameBin of - <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), - "amq.gen"); + <<>> -> + case rabbit_amqqueue:is_server_named_allowed(Args) of + true -> + rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"); + false -> + rabbit_misc:protocol_error( + precondition_failed, + "Cannot declare a server-named queue for type ~p", + [rabbit_amqqueue:get_queue_type(Args)]) + end; Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index 5ab32b8632..be0877c87d 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -38,7 +38,7 @@ dequeue/4, info/2, state_info/1, - is_policy_applicable/2 + capabilities/0 ]). -export([delete_crashed/1, @@ -438,19 +438,23 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. --spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). -is_policy_applicable(_Q, Policy) -> - Applicable = [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>, - <<"dead-letter-routing-key">>, <<"max-length">>, - <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, - <<"max-priority">>, <<"overflow">>, <<"queue-mode">>, - <<"single-active-consumer">>, <<"delivery-limit">>, - <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>, - <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, - <<"queue-master-locator">>], - lists:all(fun({P, _}) -> - lists:member(P, Applicable) - end, Policy). +capabilities() -> + #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>, + <<"dead-letter-routing-key">>, <<"max-length">>, + <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, + <<"max-priority">>, <<"overflow">>, <<"queue-mode">>, + <<"single-active-consumer">>, <<"delivery-limit">>, + <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>, + <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>, + <<"queue-master-locator">>], + queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>, + <<"x-dead-letter-routing-key">>, <<"x-max-length">>, + <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>, + <<"x-max-in-memory-bytes">>, <<"x-max-priority">>, + <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>, + <<"x-queue-type">>], + consumer_arguments => [<<"x-cancel-on-ha-failover">>], + server_named => true}. reject_seq_no(SeqNo, U0) -> reject_seq_no(SeqNo, U0, []). diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 40028c8241..8424ee5d15 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -31,7 +31,8 @@ credit/5, dequeue/5, fold_state/3, - is_policy_applicable/2 + is_policy_applicable/2, + is_server_named_allowed/1 ]). %% temporary @@ -188,8 +189,8 @@ -callback stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. --callback is_policy_applicable(amqqueue:amqqueue(), any()) -> - boolean(). +-callback capabilities() -> + #{atom() := term()}. %% TODO: this should be controlled by a registry that is populated on boot discover(<<"quorum">>) -> @@ -212,6 +213,9 @@ is_enabled(Type) -> rabbit_types:channel_exit(). declare(Q, Node) -> Mod = amqqueue:get_type(Q), + Capabilities = Mod:capabilities(), + ValidQueueArgs = maps:get(queue_arguments, Capabilities, []), + check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs), Mod:declare(Q, Node). -spec delete(amqqueue:amqqueue(), boolean(), @@ -291,8 +295,16 @@ i_down(K, _Q, _DownReason) -> is_policy_applicable(Q, Policy) -> Mod = amqqueue:get_type(Q), - Mod:is_policy_applicable(Q, Policy). - + Capabilities = Mod:capabilities(), + Applicable = maps:get(policies, Capabilities, []), + lists:all(fun({P, _}) -> + lists:member(P, Applicable) + end, Policy). + +is_server_named_allowed(Type) -> + Capabilities = Type:capabilities(), + maps:get(server_named, Capabilities, false). + -spec init() -> state(). init() -> #?STATE{}. @@ -316,6 +328,9 @@ new(Q, State) when ?is_amqqueue(Q) -> consume(Q, Spec, State) -> #ctx{state = State0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), + Capabilities = Mod:capabilities(), + ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []), + check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs), case Mod:consume(Q, Spec, State0) of {ok, CtxState, Actions} -> return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions); @@ -556,3 +571,14 @@ return_ok(State0, Actions0) -> {S, [Act | A0]} end, {State0, []}, Actions0), {ok, State, lists:reverse(Actions)}. + + +check_invalid_arguments(QueueName, Args, Keys) -> + [case lists:member(Arg, Keys) of + true -> ok; + false -> rabbit_misc:protocol_error( + precondition_failed, + "invalid arg '~s' for ~s", + [Arg, rabbit_misc:rs(QueueName)]) + end || {Arg, _, _} <- Args], + ok. diff --git a/src/rabbit_queue_type_util.erl b/src/rabbit_queue_type_util.erl index 0ff7fb312d..68b21749c7 100644 --- a/src/rabbit_queue_type_util.erl +++ b/src/rabbit_queue_type_util.erl @@ -16,8 +16,7 @@ -module(rabbit_queue_type_util). --export([check_invalid_arguments/3, - args_policy_lookup/3, +-export([args_policy_lookup/3, qname_to_internal_name/1, check_auto_delete/1, check_exclusive/1, @@ -26,16 +25,6 @@ -include("rabbit.hrl"). -include("amqqueue.hrl"). -check_invalid_arguments(QueueName, Args, Keys) -> - [case rabbit_misc:table_lookup(Args, Key) of - undefined -> ok; - _TypeVal -> rabbit_misc:protocol_error( - precondition_failed, - "invalid arg '~s' for ~s", - [Key, rabbit_misc:rs(QueueName)]) - end || Key <- Keys], - ok. - args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) -> Args = amqqueue:get_arguments(Q), AName = <<"x-", Name/binary>>, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 1f333bdae1..c0b1aa0965 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -43,7 +43,7 @@ -export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, filter_quorum_critical/1, filter_quorum_critical/2, all_replica_states/0]). --export([is_policy_applicable/2]). +-export([capabilities/0]). -export([repair_amqqueue_nodes/1, repair_amqqueue_nodes/2 ]). @@ -134,7 +134,6 @@ declare(Q, _Node) when ?amqqueue_is_quorum(Q) -> Arguments = amqqueue:get_arguments(Q), Opts = amqqueue:get_options(Q), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), - check_invalid_arguments(QName, Arguments), rabbit_queue_type_util:check_auto_delete(Q), rabbit_queue_type_util:check_exclusive(Q), rabbit_queue_type_util:check_non_durable(Q), @@ -331,20 +330,18 @@ filter_quorum_critical(Queues, ReplicaStates) -> length(AllUp) =< MinQuorum end, Queues). --spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). -is_policy_applicable(_Q, Policy) -> - Applicable = [<<"max-length">>, - <<"max-length-bytes">>, - <<"overflow">>, - <<"expires">>, - <<"max-in-memory-length">>, - <<"max-in-memory-bytes">>, - <<"delivery-limit">>, - <<"dead-letter-exchange">>, - <<"dead-letter-routing-key">>], - lists:all(fun({P, _}) -> - lists:member(P, Applicable) - end, Policy). +capabilities() -> + #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>, + <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, + <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>], + queue_arguments => [<<"x-expires">>, <<"x-dead-letter-exchange">>, + <<"x-dead-letter-routing-key">>, <<"x-max-length">>, + <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>, + <<"x-max-in-memory-bytes">>, <<"x-overflow">>, + <<"x-single-active-consumer">>, <<"x-queue-type">>, + <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>], + consumer_arguments => [<<"x-priority">>], + server_named => false}. rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), @@ -1416,12 +1413,6 @@ quorum_ctag(Other) -> maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). -check_invalid_arguments(QueueName, Args) -> - Keys = [<<"x-message-ttl">>, - <<"x-max-priority">>, - <<"x-queue-mode">>], - rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys). - queue_name(RaFifoState) -> rabbit_fifo_client:cluster_name(RaFifoState). diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 394479a8f2..3395f42a59 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -37,7 +37,7 @@ update/2, state_info/1, stat/1, - is_policy_applicable/2]). + capabilities/0]). -export([set_retention_policy/3]). -export([add_replica/3, @@ -80,7 +80,6 @@ is_enabled() -> declare(Q0, Node) when ?amqqueue_is_stream(Q0) -> Arguments = amqqueue:get_arguments(Q0), QName = amqqueue:get_name(Q0), - check_invalid_arguments(QName, Arguments), rabbit_queue_type_util:check_auto_delete(Q0), rabbit_queue_type_util:check_exclusive(Q0), rabbit_queue_type_util:check_non_durable(Q0), @@ -544,13 +543,6 @@ max_age(Age) -> max_age(Age1, Age2) -> min(rabbit_amqqueue:check_max_age(Age1), rabbit_amqqueue:check_max_age(Age2)). -check_invalid_arguments(QueueName, Args) -> - Keys = [<<"x-expires">>, <<"x-message-ttl">>, - <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>, - <<"x-max-in-memory-length">>, <<"x-max-in-memory-bytes">>, - <<"x-quorum-initial-group-size">>, <<"x-cancel-on-ha-failover">>], - rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys). - queue_name(#resource{virtual_host = VHost, name = Name}) -> Timestamp = erlang:integer_to_binary(erlang:system_time()), osiris_util:to_base64uri(erlang:binary_to_list(<<VHost/binary, "_", Name/binary, "_", @@ -658,9 +650,11 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange}, <<"x-routing-key">> => {utf8, RKey}}, R0), rabbit_msg_record:to_iodata(R). --spec is_policy_applicable(amqqueue:amqqueue(), any()) -> boolean(). -is_policy_applicable(_Q, Policy) -> - Applicable = [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>], - lists:all(fun({P, _}) -> - lists:member(P, Applicable) - end, Policy). +capabilities() -> + #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>], + queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, + <<"x-max-length">>, <<"x-max-length-bytes">>, + <<"x-single-active-consumer">>, <<"x-queue-type">>, + <<"x-max-age">>, <<"x-max-segment-size">>], + consumer_arguments => [<<"x-stream-offset">>], + server_named => false}. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index d428cb0701..97b2235411 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -91,6 +91,7 @@ all_tests() -> declare_args, declare_invalid_args, declare_invalid_properties, + declare_server_named, start_queue, stop_queue, restart_queue, @@ -378,6 +379,14 @@ declare_invalid_args(Config) -> LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-quorum-initial-group-size">>, long, 0}])). +declare_server_named(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + <<"">>, [{<<"x-queue-type">>, longstr, <<"quorum">>}])). + start_queue(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index d3dfd60f7c..777384f7ff 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -59,6 +59,7 @@ all_tests() -> declare_max_age, declare_invalid_args, declare_invalid_properties, + declare_server_named, declare_queue, delete_queue, publish, @@ -263,6 +264,14 @@ declare_invalid_args(Config) -> Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])). +declare_server_named(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + <<"">>, [{<<"x-queue-type">>, longstr, <<"stream">>}])). + declare_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
