diff options
author | kjnilsson <knilsson@pivotal.io> | 2021-04-12 14:14:26 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2021-04-12 14:14:26 +0100 |
commit | 432edb11fc8969e9ec7481e2dc44764e1b536b85 (patch) | |
tree | a530f0cdb74fee3fbd811eb40b10bde611cd0f97 | |
parent | e2893f44db081ce378d134e012536293c7c5589d (diff) | |
download | rabbitmq-server-git-longer-qq-names.tar.gz |
Allow quorum queue names to exceed atom max charslonger-qq-names
If the concatenation of the vhost and the queue name exceeds 255 chars
we instead generate an arbitrary atom name instead of throwing an
exception.
-rw-r--r-- | deps/rabbit/src/rabbit_queue_type_util.erl | 17 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 12 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 26 |
3 files changed, 44 insertions, 11 deletions
diff --git a/deps/rabbit/src/rabbit_queue_type_util.erl b/deps/rabbit/src/rabbit_queue_type_util.erl index 0d3aa9ece9..6e970b9ee1 100644 --- a/deps/rabbit/src/rabbit_queue_type_util.erl +++ b/deps/rabbit/src/rabbit_queue_type_util.erl @@ -36,11 +36,18 @@ args_policy_lookup(Name, Resolve, Q) when ?is_amqqueue(Q) -> {PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal) end. -%% TODO escape hack -qname_to_internal_name(#resource{virtual_host = <<"/">>, name = Name}) -> - erlang:binary_to_atom(<<"%2F_", Name/binary>>, utf8); -qname_to_internal_name(#resource{virtual_host = VHost, name = Name}) -> - erlang:binary_to_atom(<<VHost/binary, "_", Name/binary>>, utf8). +qname_to_internal_name(QName) -> + case name_concat(QName) of + Name when byte_size(Name) =< 255 -> + {ok, erlang:binary_to_atom(Name)}; + Name -> + {error, {too_long, Name}} + end. + +name_concat(#resource{virtual_host = <<"/">>, name = Name}) -> + <<"%2F_", Name/binary>>; +name_concat(#resource{virtual_host = VHost, name = Name}) -> + <<VHost/binary, "_", Name/binary>>. check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) -> Name = amqqueue:get_name(Q), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 67117d36b9..451f5f0f03 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -169,7 +169,12 @@ start_cluster(Q) -> Opts = amqqueue:get_options(Q), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), QuorumSize = get_default_quorum_initial_group_size(Arguments), - RaName = qname_to_internal_name(QName), + RaName = case qname_to_internal_name(QName) of + {ok, A} -> + A; + {error, {too_long, N}} -> + binary_to_atom(ra:new_uid(N)) + end, Id = {RaName, node()}, Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)), NewQ0 = amqqueue:set_pid(Q, Id), @@ -179,7 +184,8 @@ start_cluster(Q) -> [QuorumSize, rabbit_misc:rs(QName)]), case rabbit_amqqueue:internal_declare(NewQ1, false) of {created, NewQ} -> - TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), + TickTimeout = application:get_env(rabbit, quorum_tick_interval, + ?TICK_TIMEOUT), RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout) || ServerId <- members(NewQ)], case ra:start_cluster(?RA_SYSTEM, RaConfs) of @@ -943,11 +949,11 @@ cluster_state(Name) -> status(Vhost, QueueName) -> %% Handle not found queues QName = #resource{virtual_host = Vhost, name = QueueName, kind = queue}, - RName = qname_to_internal_name(QName), case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> + {RName, _} = amqqueue:get_pid(Q), Nodes = get_nodes(Q), [begin case get_sys_status({RName, N}) of diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2bfcc7b8a8..92167c319a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -92,6 +92,7 @@ all_tests() -> declare_invalid_properties, declare_server_named, start_queue, + long_name, stop_queue, restart_queue, restart_all_types, @@ -347,8 +348,6 @@ start_queue(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), LQ = ?config(queue_name, Config), - %% The stream coordinator is also a ra process, we need to ensure the quorum tests - %% are not affected by any other ra cluster that could be added in the future Children = length(rpc:call(Server, supervisor, which_children, [?SUPNAME])), ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -376,7 +375,28 @@ start_queue(Config) -> ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), ?assertMatch(Expected, - length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))). + length(rpc:call(Server, supervisor, which_children, [?SUPNAME]))), + + ok. + + +long_name(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + %% 64 + chars + VHost = <<"long_name_vhost____________________________________">>, + QName = atom_to_binary(?FUNCTION_NAME, utf8), + User = ?config(rmq_username, Config), + ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost, User), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, + VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + %% long name + LongName = binary:copy(QName, 240 div byte_size(QName)), + ?assertEqual({'queue.declare_ok', LongName, 0, 0}, + declare(Ch, LongName, + [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ok. start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |