summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_classic_queue.erl7
-rw-r--r--src/rabbit_queue_type.erl15
-rw-r--r--src/rabbit_quorum_queue.erl7
-rw-r--r--src/rabbit_stream_queue.erl7
-rw-r--r--test/quorum_queue_SUITE.erl9
-rw-r--r--test/rabbit_stream_queue_SUITE.erl9
8 files changed, 56 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9e66445443..fd7682fdeb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -56,8 +56,10 @@
-export([collect_info_all/2]).
-export([is_policy_applicable/2]).
+-export([is_server_named_allowed/1]).
-export([check_max_age/1]).
+-export([get_queue_type/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -366,6 +368,10 @@ is_policy_applicable(QName, Policy) ->
true
end.
+is_server_named_allowed(Args) ->
+ Type = get_queue_type(Args),
+ rabbit_queue_type:is_server_named_allowed(Type).
+
-spec lookup
(name()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b3f70d87d7..fb9a428a0e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2400,8 +2400,16 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
Durable = DurableDeclare andalso not ExclusiveDeclare,
ActualNameBin = case StrippedQueueNameBin of
- <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
- "amq.gen");
+ <<>> ->
+ case rabbit_amqqueue:is_server_named_allowed(Args) of
+ true ->
+ rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen");
+ false ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "Cannot declare a server-named queue for type ~p",
+ [rabbit_amqqueue:get_queue_type(Args)])
+ end;
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index 275a481af8..be0877c87d 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -38,7 +38,7 @@
dequeue/4,
info/2,
state_info/1,
- capabilities/1
+ capabilities/0
]).
-export([delete_crashed/1,
@@ -438,7 +438,7 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
[Pid, Error]) || {Pid, Error} <- Failures],
[Q || {_, {new, Q}} <- Results].
-capabilities(_Q) ->
+capabilities() ->
#{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
<<"dead-letter-routing-key">>, <<"max-length">>,
<<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
@@ -453,7 +453,8 @@ capabilities(_Q) ->
<<"x-max-in-memory-bytes">>, <<"x-max-priority">>,
<<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>,
<<"x-queue-type">>],
- consumer_arguments => [<<"x-cancel-on-ha-failover">>]}.
+ consumer_arguments => [<<"x-cancel-on-ha-failover">>],
+ server_named => true}.
reject_seq_no(SeqNo, U0) ->
reject_seq_no(SeqNo, U0, []).
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 8eb16d1e27..8424ee5d15 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -31,7 +31,8 @@
credit/5,
dequeue/5,
fold_state/3,
- is_policy_applicable/2
+ is_policy_applicable/2,
+ is_server_named_allowed/1
]).
%% temporary
@@ -188,7 +189,7 @@
-callback stat(amqqueue:amqqueue()) ->
{'ok', non_neg_integer(), non_neg_integer()}.
--callback capabilities(amqqueue:amqqueue()) ->
+-callback capabilities() ->
#{atom() := term()}.
%% TODO: this should be controlled by a registry that is populated on boot
@@ -212,7 +213,7 @@ is_enabled(Type) ->
rabbit_types:channel_exit().
declare(Q, Node) ->
Mod = amqqueue:get_type(Q),
- Capabilities = Mod:capabilities(Q),
+ Capabilities = Mod:capabilities(),
ValidQueueArgs = maps:get(queue_arguments, Capabilities, []),
check_invalid_arguments(amqqueue:get_name(Q), amqqueue:get_arguments(Q), ValidQueueArgs),
Mod:declare(Q, Node).
@@ -294,11 +295,15 @@ i_down(K, _Q, _DownReason) ->
is_policy_applicable(Q, Policy) ->
Mod = amqqueue:get_type(Q),
- Capabilities = Mod:capabilities(Q),
+ Capabilities = Mod:capabilities(),
Applicable = maps:get(policies, Capabilities, []),
lists:all(fun({P, _}) ->
lists:member(P, Applicable)
end, Policy).
+
+is_server_named_allowed(Type) ->
+ Capabilities = Type:capabilities(),
+ maps:get(server_named, Capabilities, false).
-spec init() -> state().
init() ->
@@ -323,7 +328,7 @@ new(Q, State) when ?is_amqqueue(Q) ->
consume(Q, Spec, State) ->
#ctx{state = State0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
- Capabilities = Mod:capabilities(Q),
+ Capabilities = Mod:capabilities(),
ValidConsumerArgs = maps:get(consumer_arguments, Capabilities, []),
check_invalid_arguments(amqqueue:get_name(Q), maps:get(args, Spec), ValidConsumerArgs),
case Mod:consume(Q, Spec, State0) of
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index d4c9dd1d9c..c0b1aa0965 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -43,7 +43,7 @@
-export([list_with_minimum_quorum/0, list_with_minimum_quorum_for_cli/0,
filter_quorum_critical/1, filter_quorum_critical/2,
all_replica_states/0]).
--export([capabilities/1]).
+-export([capabilities/0]).
-export([repair_amqqueue_nodes/1,
repair_amqqueue_nodes/2
]).
@@ -330,7 +330,7 @@ filter_quorum_critical(Queues, ReplicaStates) ->
length(AllUp) =< MinQuorum
end, Queues).
-capabilities(_Q) ->
+capabilities() ->
#{policies => [<<"max-length">>, <<"max-length-bytes">>, <<"overflow">>,
<<"expires">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
<<"delivery-limit">>, <<"dead-letter-exchange">>, <<"dead-letter-routing-key">>],
@@ -340,7 +340,8 @@ capabilities(_Q) ->
<<"x-max-in-memory-bytes">>, <<"x-overflow">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
<<"x-quorum-initial-group-size">>, <<"x-delivery-limit">>],
- consumer_arguments => [<<"x-priority">>]}.
+ consumer_arguments => [<<"x-priority">>],
+ server_named => false}.
rpc_delete_metrics(QName) ->
ets:delete(queue_coarse_metrics, QName),
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index e251fe0117..ed44f1258a 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -37,7 +37,7 @@
update/2,
state_info/1,
stat/1,
- capabilities/1]).
+ capabilities/0]).
-export([set_retention_policy/3]).
-export([add_replica/3,
@@ -649,10 +649,11 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
<<"x-routing-key">> => {utf8, RKey}}, R0),
rabbit_msg_record:to_iodata(R).
-capabilities(_Q) ->
+capabilities() ->
#{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>],
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
<<"x-max-length">>, <<"x-max-length-bytes">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
<<"x-max-age">>, <<"x-max-segment-size">>],
- consumer_arguments => [<<"x-stream-offset">>]}.
+ consumer_arguments => [<<"x-stream-offset">>],
+ server_named => false}.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index d428cb0701..97b2235411 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -91,6 +91,7 @@ all_tests() ->
declare_args,
declare_invalid_args,
declare_invalid_properties,
+ declare_server_named,
start_queue,
stop_queue,
restart_queue,
@@ -378,6 +379,14 @@ declare_invalid_args(Config) ->
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-quorum-initial-group-size">>, long, 0}])).
+declare_server_named(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ <<"">>, [{<<"x-queue-type">>, longstr, <<"quorum">>}])).
+
start_queue(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index 67ca8eba8b..cb38e15ea7 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -59,6 +59,7 @@ all_tests() ->
declare_max_age,
declare_invalid_args,
declare_invalid_properties,
+ declare_server_named,
declare_queue,
delete_queue,
publish,
@@ -263,6 +264,14 @@ declare_invalid_args(Config) ->
Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])).
+declare_server_named(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ <<"">>, [{<<"x-queue-type">>, longstr, <<"stream">>}])).
+
declare_queue(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),