diff options
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 56 | ||||
| -rw-r--r-- | test/per_vhost_queue_limit_SUITE.erl | 693 |
6 files changed, 785 insertions, 27 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index b1d34ca516..9fd3244979 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -37,7 +37,7 @@ list/0, list/1, list_on_node/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, - is_over_connection_limit/1, count_connections_in/1]). + count_connections_in/1]). -include_lib("rabbit.hrl"). @@ -214,29 +214,6 @@ list_on_node(Node) -> catch exit:{aborted, {no_exists, _}} -> [] end. --spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. - -is_over_connection_limit(VirtualHost) -> - case rabbit_vhost_limit:connection_limit(VirtualHost) of - %% no limit configured - undefined -> false; - %% with limit = 0, no connections are allowed - {ok, 0} -> {true, 0}; - {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> - ConnectionCount = count_connections_in(VirtualHost), - case ConnectionCount >= Limit of - false -> false; - true -> {true, Limit} - end; - %% any negative value means "no limit". Note that parameter validation - %% will replace negative integers with 'undefined', so this is to be - %% explicit and extra defensive - {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; - %% ignore non-integer limits - {ok, _Limit} -> false - end. - - -spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). count_connections_in(VirtualHost) -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 858681ecfd..53b0340b8a 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -106,7 +106,7 @@ is_over_connection_limit(VHost, {Username, _Password}, Pid) -> none -> ""; _ -> Username end, - try rabbit_connection_tracking:is_over_connection_limit(VHost) of + try rabbit_vhost_limit:is_over_connection_limit(VHost) of false -> false; {true, Limit} -> rabbit_log_connection:error( diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 3909096964..1bb19b23da 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -50,8 +50,20 @@ create() -> Tab, TabDef1, Reason}}) end end, definitions()), + ensure_secondary_indexes(), ok. +%% Sets up secondary indexes in a blank node database. +ensure_secondary_indexes() -> + ensure_secondary_index(rabbit_queue, vhost), + ok. + +ensure_secondary_index(Table, Field) -> + case mnesia:add_table_index(Table, Field) of + {atomic, ok} -> ok; + {aborted, {already_exists, Table, _}} -> ok + end. + %% The sequence in which we delete the schema and then the other %% tables is important: if we delete the schema first when moving to %% RAM mnesia will loudly complain since it doesn't make much sense to diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 30fe4fa17f..a53ad0c8f9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -57,6 +57,7 @@ -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). -rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}). -rabbit_upgrade({vhost_limits, mnesia, []}). +-rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}). %% ------------------------------------------------------------------- @@ -91,6 +92,8 @@ -spec recoverable_slaves() -> 'ok'. -spec user_password_hashing() -> 'ok'. -spec vhost_limits() -> 'ok'. +-spec operator_policies() -> 'ok'. +-spec queue_vhost_field() -> 'ok'. %%-------------------------------------------------------------------- @@ -528,6 +531,27 @@ queue_operator_policies(Table) -> gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]). +queue_vhost_field() -> + ok = queue_vhost_field(rabbit_queue), + ok = queue_vhost_field(rabbit_durable_queue), + {atomic, ok} = mnesia:add_table_index(rabbit_queue, vhost), + {atomic, ok} = mnesia:add_table_index(rabbit_durable_queue, vhost), + ok. + +queue_vhost_field(Table) -> + transform( + Table, + fun ({amqqueue, Name = {resource, VHost, queue, _QName}, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, + State, PolicyVersion, SlavePidsPendingShutdown}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators, + State, PolicyVersion, SlavePidsPendingShutdown, VHost} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, operator_policy, + gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl index 2d9a2f075e..bd79f4dd45 100644 --- a/src/rabbit_vhost_limit.erl +++ b/src/rabbit_vhost_limit.erl @@ -23,7 +23,8 @@ -export([register/0]). -export([parse_set/2, clear/1]). -export([validate/5, notify/4, notify_clear/3]). --export([connection_limit/1]). +-export([connection_limit/1, queue_limit/1, + is_over_queue_limit/1, is_over_connection_limit/1]). -import(rabbit_misc, [pget/2]). @@ -53,6 +54,56 @@ notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) -> connection_limit(VirtualHost) -> get_limit(VirtualHost, <<"max-connections">>). +queue_limit(VirtualHost) -> + get_limit(VirtualHost, <<"max-queues">>). + + +-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. + +is_over_connection_limit(VirtualHost) -> + case rabbit_vhost_limit:connection_limit(VirtualHost) of + %% no limit configured + undefined -> false; + %% with limit = 0, no connections are allowed + {ok, 0} -> {true, 0}; + {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> + ConnectionCount = rabbit_connection_tracking:count_connections_in(VirtualHost), + case ConnectionCount >= Limit of + false -> false; + true -> {true, Limit} + end; + %% any negative value means "no limit". Note that parameter validation + %% will replace negative integers with 'undefined', so this is to be + %% explicit and extra defensive + {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; + %% ignore non-integer limits + {ok, _Limit} -> false + end. + + +-spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. + +is_over_queue_limit(VirtualHost) -> + case queue_limit(VirtualHost) of + %% no limit configured + undefined -> false; + %% with limit = 0, no queues can be declared (perhaps not very + %% useful but consistent with the connection limit) + {ok, 0} -> {true, 0}; + {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> + QueueCount = rabbit_amqqueue:count(VirtualHost), + case QueueCount >= Limit of + false -> false; + true -> {true, Limit} + end; + %% any negative value means "no limit". Note that parameter validation + %% will replace negative integers with 'undefined', so this is to be + %% explicit and extra defensive + {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; + %% ignore non-integer limits + {ok, _Limit} -> false + end. + %%---------------------------------------------------------------------------- parse_set(VHost, Defn) -> @@ -72,7 +123,8 @@ clear(VHost) -> <<"limits">>). vhost_limit_validation() -> - [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, mandatory}]. + [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional}, + {<<"max-queues">>, fun rabbit_parameter_validation:integer/2, optional}]. update_vhost(VHostName, Limits) -> rabbit_misc:execute_mnesia_transaction( diff --git a/test/per_vhost_queue_limit_SUITE.erl b/test/per_vhost_queue_limit_SUITE.erl new file mode 100644 index 0000000000..1c6bea08dd --- /dev/null +++ b/test/per_vhost_queue_limit_SUITE.erl @@ -0,0 +1,693 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(per_vhost_queue_limit_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-import(rabbit_ct_client_helpers, [open_unmanaged_connection/3, + close_connection_and_channel/2]). + +all() -> + [ + {group, cluster_size_1} + , {group, cluster_size_2} + ]. + +groups() -> + [ + {cluster_size_1, [], [ + most_basic_single_node_queue_count, + single_node_single_vhost_queue_count, + single_node_multiple_vhosts_queue_count, + single_node_single_vhost_limit, + single_node_single_vhost_zero_limit, + single_node_single_vhost_limit_with_durable_named_queue, + single_node_single_vhost_zero_limit_with_durable_named_queue, + single_node_single_vhost_limit_with_queue_ttl, + single_node_single_vhost_limit_with_redeclaration + ]}, + {cluster_size_2, [], [ + most_basic_cluster_queue_count, + cluster_multiple_vhosts_queue_count, + cluster_multiple_vhosts_limit, + cluster_multiple_vhosts_zero_limit, + cluster_multiple_vhosts_limit_with_durable_named_queue, + cluster_multiple_vhosts_zero_limit_with_durable_named_queue, + cluster_node_restart_queue_count + ]} + ]. + +suite() -> + [ + %% If a test hangs, no need to wait for 30 minutes. + {timetrap, {minutes, 8}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_1, Config) -> + init_per_multinode_group(cluster_size_1, Config, 1); +init_per_group(cluster_size_2, Config) -> + init_per_multinode_group(cluster_size_2, Config, 2); +init_per_group(cluster_rename, Config) -> + init_per_multinode_group(cluster_rename, Config, 2). + +init_per_multinode_group(Group, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + case Group of + cluster_rename -> + % The broker is managed by {init,end}_per_testcase(). + Config1; + _ -> + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()) + end. + +end_per_group(cluster_rename, Config) -> + % The broker is managed by {init,end}_per_testcase(). + Config; +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config. + +end_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) -> + Config1 = ?config(save_config, Config), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- + +most_basic_single_node_queue_count(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + Conn = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + declare_exclusive_queues(Ch, 10), + ?assertEqual(10, count_queues_in(Config, VHost)), + close_connection_and_channel(Conn, Ch), + ?assertEqual(0, count_queues_in(Config, VHost)), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + +single_node_single_vhost_queue_count(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + Conn = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + declare_exclusive_queues(Ch, 10), + ?assertEqual(10, count_queues_in(Config, VHost)), + declare_durable_queues(Ch, 10), + ?assertEqual(20, count_queues_in(Config, VHost)), + delete_durable_queues(Ch, 10), + ?assertEqual(10, count_queues_in(Config, VHost)), + close_connection_and_channel(Conn, Ch), + ?assertEqual(0, count_queues_in(Config, VHost)), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + +single_node_multiple_vhosts_queue_count(Config) -> + VHost1 = <<"queue-limits1">>, + VHost2 = <<"queue-limits2">>, + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_queues_in(Config, VHost1)), + ?assertEqual(0, count_queues_in(Config, VHost2)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + Conn2 = open_unmanaged_connection(Config, 0, VHost2), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + declare_exclusive_queues(Ch1, 10), + ?assertEqual(10, count_queues_in(Config, VHost1)), + declare_durable_queues(Ch1, 10), + ?assertEqual(20, count_queues_in(Config, VHost1)), + delete_durable_queues(Ch1, 10), + ?assertEqual(10, count_queues_in(Config, VHost1)), + declare_exclusive_queues(Ch2, 30), + ?assertEqual(30, count_queues_in(Config, VHost2)), + close_connection_and_channel(Conn1, Ch1), + ?assertEqual(0, count_queues_in(Config, VHost1)), + close_connection_and_channel(Conn2, Ch2), + ?assertEqual(0, count_queues_in(Config, VHost2)), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2). + +single_node_single_vhost_limit(Config) -> + single_node_single_vhost_limit_with(Config, 5), + single_node_single_vhost_limit_with(Config, 10). +single_node_single_vhost_zero_limit(Config) -> + single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue = <<"">>, + exclusive = true}). + +single_node_single_vhost_limit_with_durable_named_queue(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + + set_vhost_queue_limit(Config, VHost, 3), + Conn = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q1">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q2">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q3">>, + exclusive = false, + durable = true}), + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q4">>, + exclusive = false, + durable = true}) + end), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + +single_node_single_vhost_zero_limit_with_durable_named_queue(Config) -> + single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue = <<"Q4">>, + exclusive = false, + durable = true}). + +single_node_single_vhost_limit_with(Config, WatermarkLimit) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + + set_vhost_queue_limit(Config, VHost, 3), + Conn = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + + set_vhost_queue_limit(Config, VHost, WatermarkLimit), + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, lists:seq(1, WatermarkLimit)), + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + +single_node_single_vhost_zero_limit_with(Config, QueueDeclare) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + + set_vhost_queue_limit(Config, VHost, 0), + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch1, QueueDeclare) + end), + + Conn2 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + %% lift the limit + set_vhost_queue_limit(Config, VHost, -1), + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, lists:seq(1, 100)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + + +single_node_single_vhost_limit_with_queue_ttl(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + + set_vhost_queue_limit(Config, VHost, 3), + + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>, + exclusive = true, + arguments = [{<<"x-expires">>, long, 2000}]}) + end, lists:seq(1, 3)), + + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end), + + Conn2 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + %% wait for the queues to expire + timer:sleep(3000), + + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>, + exclusive = true}), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + + +single_node_single_vhost_limit_with_redeclaration(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + + set_vhost_queue_limit(Config, VHost, 3), + Conn1 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q1">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q2">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>, + exclusive = false, + durable = true}), + + %% can't declare a new queue... + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q4">>, + exclusive = false, + durable = true}) + end), + + + Conn2 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + %% ...but re-declarations succeed + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q1">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q2">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>, + exclusive = false, + durable = true}), + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q4">>, + exclusive = false, + durable = true}) + end), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + + +most_basic_cluster_queue_count(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost, 0)), + ?assertEqual(0, count_queues_in(Config, VHost, 1)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + declare_exclusive_queues(Ch1, 10), + ?assertEqual(10, count_queues_in(Config, VHost, 0)), + ?assertEqual(10, count_queues_in(Config, VHost, 1)), + + Conn2 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + declare_exclusive_queues(Ch2, 15), + ?assertEqual(25, count_queues_in(Config, VHost, 0)), + ?assertEqual(25, count_queues_in(Config, VHost, 1)), + close_connection_and_channel(Conn1, Ch1), + close_connection_and_channel(Conn2, Ch2), + ?assertEqual(0, count_queues_in(Config, VHost, 0)), + ?assertEqual(0, count_queues_in(Config, VHost, 1)), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + +cluster_node_restart_queue_count(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost, 0)), + ?assertEqual(0, count_queues_in(Config, VHost, 1)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + declare_exclusive_queues(Ch1, 10), + ?assertEqual(10, count_queues_in(Config, VHost, 0)), + ?assertEqual(10, count_queues_in(Config, VHost, 1)), + + rabbit_ct_broker_helpers:restart_broker(Config, 0), + ?assertEqual(0, count_queues_in(Config, VHost, 0)), + + Conn2 = open_unmanaged_connection(Config, 1, VHost), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + declare_exclusive_queues(Ch2, 15), + ?assertEqual(15, count_queues_in(Config, VHost, 0)), + ?assertEqual(15, count_queues_in(Config, VHost, 1)), + + declare_durable_queues(Ch2, 10), + ?assertEqual(25, count_queues_in(Config, VHost, 0)), + ?assertEqual(25, count_queues_in(Config, VHost, 1)), + + rabbit_ct_broker_helpers:restart_broker(Config, 1), + + ?assertEqual(10, count_queues_in(Config, VHost, 0)), + ?assertEqual(10, count_queues_in(Config, VHost, 1)), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + + +cluster_multiple_vhosts_queue_count(Config) -> + VHost1 = <<"queue-limits1">>, + VHost2 = <<"queue-limits2">>, + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_queues_in(Config, VHost1)), + ?assertEqual(0, count_queues_in(Config, VHost2)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + declare_exclusive_queues(Ch1, 10), + ?assertEqual(10, count_queues_in(Config, VHost1, 0)), + ?assertEqual(10, count_queues_in(Config, VHost1, 1)), + ?assertEqual(0, count_queues_in(Config, VHost2, 0)), + ?assertEqual(0, count_queues_in(Config, VHost2, 1)), + + Conn2 = open_unmanaged_connection(Config, 0, VHost2), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + declare_exclusive_queues(Ch2, 15), + ?assertEqual(15, count_queues_in(Config, VHost2, 0)), + ?assertEqual(15, count_queues_in(Config, VHost2, 1)), + close_connection_and_channel(Conn1, Ch1), + close_connection_and_channel(Conn2, Ch2), + ?assertEqual(0, count_queues_in(Config, VHost1, 0)), + ?assertEqual(0, count_queues_in(Config, VHost1, 1)), + ?assertEqual(0, count_queues_in(Config, VHost2, 0)), + ?assertEqual(0, count_queues_in(Config, VHost2, 1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2). + +cluster_multiple_vhosts_limit(Config) -> + cluster_multiple_vhosts_limit_with(Config, 10), + cluster_multiple_vhosts_limit_with(Config, 20). + +cluster_multiple_vhosts_limit_with(Config, WatermarkLimit) -> + VHost1 = <<"queue-limits1">>, + VHost2 = <<"queue-limits2">>, + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + ?assertEqual(0, count_queues_in(Config, VHost1)), + ?assertEqual(0, count_queues_in(Config, VHost2)), + + set_vhost_queue_limit(Config, VHost1, 3), + set_vhost_queue_limit(Config, VHost2, 3), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + set_vhost_queue_limit(Config, VHost1, WatermarkLimit), + + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, lists:seq(1, WatermarkLimit)), + + Conn2 = open_unmanaged_connection(Config, 1, VHost2), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + set_vhost_queue_limit(Config, VHost2, WatermarkLimit), + + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, lists:seq(1, WatermarkLimit)), + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end), + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2). + + +cluster_multiple_vhosts_zero_limit(Config) -> + cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue = <<"">>, + exclusive = true}). + +cluster_multiple_vhosts_limit_with_durable_named_queue(Config) -> + VHost1 = <<"queue-limits1">>, + VHost2 = <<"queue-limits2">>, + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + ?assertEqual(0, count_queues_in(Config, VHost1)), + ?assertEqual(0, count_queues_in(Config, VHost2)), + + set_vhost_queue_limit(Config, VHost1, 3), + set_vhost_queue_limit(Config, VHost2, 3), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + + Conn2 = open_unmanaged_connection(Config, 1, VHost2), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + #'queue.declare_ok'{} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q1">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q2">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{} = + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>, + exclusive = false, + durable = true}), + + #'queue.declare_ok'{} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q1">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q2">>, + exclusive = false, + durable = true}), + #'queue.declare_ok'{} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>, + exclusive = false, + durable = true}), + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>, + exclusive = false, + durable = true}) + end), + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>, + exclusive = false, + durable = true}) + end), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2). + +cluster_multiple_vhosts_zero_limit_with_durable_named_queue(Config) -> + cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue = <<"Q4">>, + exclusive = false, + durable = true}). + +cluster_multiple_vhosts_zero_limit_with(Config, QueueDeclare) -> + VHost1 = <<"queue-limits1">>, + VHost2 = <<"queue-limits2">>, + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + ?assertEqual(0, count_queues_in(Config, VHost1)), + ?assertEqual(0, count_queues_in(Config, VHost2)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + Conn2 = open_unmanaged_connection(Config, 1, VHost2), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + set_vhost_queue_limit(Config, VHost1, 0), + set_vhost_queue_limit(Config, VHost2, 0), + + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch1, QueueDeclare) + end), + expect_shutdown_due_to_precondition_failed( + fun () -> + amqp_channel:call(Ch2, QueueDeclare) + end), + + + Conn3 = open_unmanaged_connection(Config, 0, VHost1), + {ok, Ch3} = amqp_connection:open_channel(Conn3), + Conn4 = open_unmanaged_connection(Config, 1, VHost2), + {ok, Ch4} = amqp_connection:open_channel(Conn4), + + %% lift the limits + set_vhost_queue_limit(Config, VHost1, -1), + set_vhost_queue_limit(Config, VHost2, -1), + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch3, #'queue.declare'{queue = <<"">>, + exclusive = true}), + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch4, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, lists:seq(1, 400)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2). + + + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +set_up_vhost(Config, VHost) -> + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost), + set_vhost_queue_limit(Config, VHost, -1). + +set_vhost_queue_limit(Config, VHost, Count) -> + set_vhost_queue_limit(Config, 0, VHost, Count). + +set_vhost_queue_limit(Config, NodeIndex, VHost, Count) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + rabbit_ct_broker_helpers:control_action( + set_vhost_limits, Node, + ["{\"max-queues\": " ++ integer_to_list(Count) ++ "}"], + [{"-p", binary_to_list(VHost)}]). + +count_queues_in(Config, VHost) -> + count_queues_in(Config, VHost, 0). +count_queues_in(Config, VHost, NodeIndex) -> + timer:sleep(200), + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_amqqueue, + count, [VHost]). + +declare_exclusive_queues(Ch, N) -> + lists:foreach(fun (_) -> + amqp_channel:call(Ch, + #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, + lists:seq(1, N)). + +declare_durable_queues(Ch, N) -> + lists:foreach(fun (I) -> + amqp_channel:call(Ch, + #'queue.declare'{queue = durable_queue_name(I), + exclusive = false, + durable = true}) + end, + lists:seq(1, N)). + +delete_durable_queues(Ch, N) -> + lists:foreach(fun (I) -> + amqp_channel:call(Ch, + #'queue.delete'{queue = durable_queue_name(I)}) + end, + lists:seq(1, N)). + +durable_queue_name(N) when is_integer(N) -> + iolist_to_binary(io_lib:format("queue-limits-durable-~p", [N])). + +expect_shutdown_due_to_precondition_failed(Thunk) -> + try + Thunk(), + ok + catch _:{{shutdown, {server_initiated_close, 406, _}}, _} -> + %% expected + ok + end. |
