summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2021-04-12 14:14:26 +0100
committerkjnilsson <knilsson@pivotal.io>2021-04-12 14:14:26 +0100
commit432edb11fc8969e9ec7481e2dc44764e1b536b85 (patch)
treea530f0cdb74fee3fbd811eb40b10bde611cd0f97
parente2893f44db081ce378d134e012536293c7c5589d (diff)
downloadrabbitmq-server-git-longer-qq-names.tar.gz
Allow quorum queue names to exceed atom max charslonger-qq-names
If the concatenation of the vhost and the queue name exceeds 255 chars we instead generate an arbitrary atom name instead of throwing an exception.
-rw-r--r--deps/rabbit/src/rabbit_queue_type_util.erl17
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl12
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl26
3 files changed, 44 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_queue_type_util.erl b/deps/rabbit/src/rabbit_queue_type_util.erl
index 0d3aa9ece9..6e970b9ee1 100644
--- a/deps/rabbit/src/rabbit_queue_type_util.erl
+++ b/deps/rabbit/src/rabbit_queue_type_util.erl
@@ -36,11 +36,18 @@ args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) ->
{PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal)
end.
-%% TODO escape hack
-qname_to_internal_name(#resource{virtual_host = <<"/">>, name = Name}) ->
- erlang:binary_to_atom(<<"%2F_", Name/binary>>, utf8);
-qname_to_internal_name(#resource{virtual_host = VHost, name = Name}) ->
- erlang:binary_to_atom(<<VHost/binary, "_", Name/binary>>, utf8).
+qname_to_internal_name(QName) ->
+ case name_concat(QName) of
+ Name when byte_size(Name) =< 255 ->
+ {ok, erlang:binary_to_atom(Name)};
+ Name ->
+ {error, {too_long, Name}}
+ end.
+
+name_concat(#resource{virtual_host = <<"/">>, name = Name}) ->
+ <<"%2F_", Name/binary>>;
+name_concat(#resource{virtual_host = VHost, name = Name}) ->
+ <<VHost/binary, "_", Name/binary>>.
check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) ->
Name = amqqueue:get_name(Q),
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 67117d36b9..451f5f0f03 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -169,7 +169,12 @@ start_cluster(Q) ->
Opts = amqqueue:get_options(Q),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
QuorumSize = get_default_quorum_initial_group_size(Arguments),
- RaName = qname_to_internal_name(QName),
+ RaName = case qname_to_internal_name(QName) of
+ {ok, A} ->
+ A;
+ {error, {too_long, N}} ->
+ binary_to_atom(ra:new_uid(N))
+ end,
Id = {RaName, node()},
Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)),
NewQ0 = amqqueue:set_pid(Q, Id),
@@ -179,7 +184,8 @@ start_cluster(Q) ->
[QuorumSize, rabbit_misc:rs(QName)]),
case rabbit_amqqueue:internal_declare(NewQ1, false) of
{created, NewQ} ->
- TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
+ TickTimeout = application:get_env(rabbit, quorum_tick_interval,
+ ?TICK_TIMEOUT),
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
|| ServerId <- members(NewQ)],
case ra:start_cluster(?RA_SYSTEM, RaConfs) of
@@ -943,11 +949,11 @@ cluster_state(Name) ->
status(Vhost, QueueName) ->
%% Handle not found queues
QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue},
- RName = qname_to_internal_name(QName),
case rabbit_amqqueue:lookup(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
+ {RName, _} = amqqueue:get_pid(Q),
Nodes = get_nodes(Q),
[begin
case get_sys_status({RName, N}) of
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 2bfcc7b8a8..92167c319a 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -92,6 +92,7 @@ all_tests() ->
declare_invalid_properties,
declare_server_named,
start_queue,
+ long_name,
stop_queue,
restart_queue,
restart_all_types,
@@ -347,8 +348,6 @@ start_queue(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
LQ = ?config(queue_name, Config),
- %% The stream coordinator is also a ra process, we need to ensure the quorum tests
- %% are not affected by any other ra cluster that could be added in the future
Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])),
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -376,7 +375,28 @@ start_queue(Config) ->
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
?assertMatch(Expected,
- length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))).
+ length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))),
+
+ ok.
+
+
+long_name(Config) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% 64 + chars
+ VHost = <<"long_name_vhost____________________________________">>,
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ User = ?config(rmq_username, Config),
+ ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost, User),
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
+ VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ %% long name
+ LongName = binary:copy(QName, 240 div byte_size(QName)),
+ ?assertEqual({'queue.declare_ok', LongName, 0, 0},
+ declare(Ch, LongName,
+ [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ok.
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),