diff options
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
-rw-r--r-- | src/rabbit_classic_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_queue_type.erl | 15 | ||||
-rw-r--r-- | src/rabbit_quorum_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 7 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 9 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 9 |
8 files changed, 56 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9e66445443..fd7682fdeb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -56,8 +56,10 @@ -export([collect_info_all/2]). -export([is_policy_applicable/2]). +-export([is_server_named_allowed/1]). -export([check_max_age/1]). +-export([get_queue_type/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -366,6 +368,10 @@ is_policy_applicable(QName, Policy) -> true end. +is_server_named_allowed(Args) -> + Type = get_queue_type(Args), + rabbit_queue_type:is_server_named_allowed(Type). + -spec lookup (name()) -> rabbit_types:ok(amqqueue:amqqueue()) | diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b3f70d87d7..fb9a428a0e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2400,8 +2400,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin, StrippedQueueNameBin = strip_cr_lf(QueueNameBin), Durable = DurableDeclare andalso not ExclusiveDeclare, ActualNameBin = case StrippedQueueNameBin of - <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), - "amq.gen"); + <<>> -> + case rabbit_amqqueue:is_server_named_allowed(Args) of + true -> + rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"); + false -> + rabbit_misc:protocol_error( + precondition_failed, + "Cannot declare a server-named queue for type ~p", + [rabbit_amqqueue:get_queue_type(Args)]) + end; Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index 275a481af8..be0877c87d 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -38,7 +38,7 @@ dequeue/4, info/2, state_info/1, - capabilities/1 + capabilities/0 ]). -export([delete_crashed/1, @@ -438,7 +438,7 @@ recover_durable_queues(QueuesAndRecoveryTerms) -> [Pid, Error]) || {Pid, Error} <- Failures], [Q || {_, {new, Q}} <- Results]. -capabilities(_Q) -> +capabilities() -> #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>, <<"max-length">>, <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, @@ -453,7 +453,8 @@ capabilities(_Q) -> <<"x-max-in-memory-bytes">>, <<"x-max-priority">>, <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>, <<"x-queue-type">>], - consumer_arguments => [<<"x-cancel-on-ha-failover">>]}. + consumer_arguments => [<<"x-cancel-on-ha-failover">>], + server_named => true}. reject_seq_no(SeqNo, U0) -> reject_seq_no(SeqNo, U0, []). diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 8eb16d1e27..8424ee5d15 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -31,7 +31,8 @@ credit/5, dequeue/5, fold_state/3, - is_policy_applicable/2 + is_policy_applicable/2, + is_server_named_allowed/1 ]). %% temporary @@ -188,7 +189,7 @@ -callback stat(amqqueue:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. --callback capabilities(amqqueue:amqqueue()) -> +-callback capabilities() -> #{atom() := term()}. %% TODO: this should be controlled by a registry that is populated on boot @@ -212,7 +213,7 @@ is_enabled(Type) -> rabbit_types:channel_exit(). declare(Q, Node) -> Mod = amqqueue:get_type(Q), - Capabilities = Mod:capabilities(Q), + Capabilities = Mod:capabilities(), ValidQueueArgs = maps:get(queue_arguments, Capabilities, []), check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs), Mod:declare(Q, Node). @@ -294,11 +295,15 @@ i_down(K, _Q, _DownReason) -> is_policy_applicable(Q, Policy) -> Mod = amqqueue:get_type(Q), - Capabilities = Mod:capabilities(Q), + Capabilities = Mod:capabilities(), Applicable = maps:get(policies, Capabilities, []), lists:all(fun({P, _}) -> lists:member(P, Applicable) end, Policy). + +is_server_named_allowed(Type) -> + Capabilities = Type:capabilities(), + maps:get(server_named, Capabilities, false). -spec init() -> state(). init() -> @@ -323,7 +328,7 @@ new(Q, State) when ?is_amqqueue(Q) -> consume(Q, Spec, State) -> #ctx{state = State0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), - Capabilities = Mod:capabilities(Q), + Capabilities = Mod:capabilities(), ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []), check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs), case Mod:consume(Q, Spec, State0) of diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index d4c9dd1d9c..c0b1aa0965 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -43,7 +43,7 @@ -export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0, filter_quorum_critical/1, filter_quorum_critical/2, all_replica_states/0]). --export([capabilities/1]). +-export([capabilities/0]). -export([repair_amqqueue_nodes/1, repair_amqqueue_nodes/2 ]). @@ -330,7 +330,7 @@ filter_quorum_critical(Queues, ReplicaStates) -> length(AllUp) =< MinQuorum end, Queues). -capabilities(_Q) -> +capabilities() -> #{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>, <<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>, <<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>], @@ -340,7 +340,8 @@ capabilities(_Q) -> <<"x-max-in-memory-bytes">>, <<"x-overflow">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>], - consumer_arguments => [<<"x-priority">>]}. + consumer_arguments => [<<"x-priority">>], + server_named => false}. rpc_delete_metrics(QName) -> ets:delete(queue_coarse_metrics, QName), diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index e251fe0117..ed44f1258a 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -37,7 +37,7 @@ update/2, state_info/1, stat/1, - capabilities/1]). + capabilities/0]). -export([set_retention_policy/3]). -export([add_replica/3, @@ -649,10 +649,11 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange}, <<"x-routing-key">> => {utf8, RKey}}, R0), rabbit_msg_record:to_iodata(R). -capabilities(_Q) -> +capabilities() -> #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>], queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, <<"x-max-age">>, <<"x-max-segment-size">>], - consumer_arguments => [<<"x-stream-offset">>]}. + consumer_arguments => [<<"x-stream-offset">>], + server_named => false}. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index d428cb0701..97b2235411 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -91,6 +91,7 @@ all_tests() -> declare_args, declare_invalid_args, declare_invalid_properties, + declare_server_named, start_queue, stop_queue, restart_queue, @@ -378,6 +379,14 @@ declare_invalid_args(Config) -> LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-quorum-initial-group-size">>, long, 0}])). +declare_server_named(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + <<"">>, [{<<"x-queue-type">>, longstr, <<"quorum">>}])). + start_queue(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index 67ca8eba8b..cb38e15ea7 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -59,6 +59,7 @@ all_tests() -> declare_max_age, declare_invalid_args, declare_invalid_properties, + declare_server_named, declare_queue, delete_queue, publish, @@ -263,6 +264,14 @@ declare_invalid_args(Config) -> Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])). +declare_server_named(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + <<"">>, [{<<"x-queue-type">>, longstr, <<"stream">>}])). + declare_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |