summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_connection_tracking.erl25
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_table.erl12
-rw-r--r--src/rabbit_upgrade_functions.erl24
-rw-r--r--src/rabbit_vhost_limit.erl56
-rw-r--r--test/per_vhost_queue_limit_SUITE.erl693
6 files changed, 785 insertions, 27 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index b1d34ca516..9fd3244979 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -37,7 +37,7 @@
list/0, list/1, list_on_node/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
- is_over_connection_limit/1, count_connections_in/1]).
+ count_connections_in/1]).
-include_lib("rabbit.hrl").
@@ -214,29 +214,6 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.
--spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
-
-is_over_connection_limit(VirtualHost) ->
- case rabbit_vhost_limit:connection_limit(VirtualHost) of
- %% no limit configured
- undefined -> false;
- %% with limit = 0, no connections are allowed
- {ok, 0} -> {true, 0};
- {ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
- ConnectionCount = count_connections_in(VirtualHost),
- case ConnectionCount >= Limit of
- false -> false;
- true -> {true, Limit}
- end;
- %% any negative value means "no limit". Note that parameter validation
- %% will replace negative integers with 'undefined', so this is to be
- %% explicit and extra defensive
- {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
- %% ignore non-integer limits
- {ok, _Limit} -> false
- end.
-
-
-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
count_connections_in(VirtualHost) ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 858681ecfd..53b0340b8a 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -106,7 +106,7 @@ is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
none -> "";
_ -> Username
end,
- try rabbit_connection_tracking:is_over_connection_limit(VHost) of
+ try rabbit_vhost_limit:is_over_connection_limit(VHost) of
false -> false;
{true, Limit} ->
rabbit_log_connection:error(
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 3909096964..1bb19b23da 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -50,8 +50,20 @@ create() ->
Tab, TabDef1, Reason}})
end
end, definitions()),
+ ensure_secondary_indexes(),
ok.
+%% Sets up secondary indexes in a blank node database.
+ensure_secondary_indexes() ->
+ ensure_secondary_index(rabbit_queue, vhost),
+ ok.
+
+ensure_secondary_index(Table, Field) ->
+ case mnesia:add_table_index(Table, Field) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, Table, _}} -> ok
+ end.
+
%% The sequence in which we delete the schema and then the other
%% tables is important: if we delete the schema first when moving to
%% RAM mnesia will loudly complain since it doesn't make much sense to
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 30fe4fa17f..a53ad0c8f9 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -57,6 +57,7 @@
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
-rabbit_upgrade({operator_policies, mnesia, [slave_pids_pending_shutdown, internal_system_x]}).
-rabbit_upgrade({vhost_limits, mnesia, []}).
+-rabbit_upgrade({queue_vhost_field, mnesia, [operator_policies]}).
%% -------------------------------------------------------------------
@@ -91,6 +92,8 @@
-spec recoverable_slaves() -> 'ok'.
-spec user_password_hashing() -> 'ok'.
-spec vhost_limits() -> 'ok'.
+-spec operator_policies() -> 'ok'.
+-spec queue_vhost_field() -> 'ok'.
%%--------------------------------------------------------------------
@@ -528,6 +531,27 @@ queue_operator_policies(Table) ->
gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown]).
+queue_vhost_field() ->
+ ok = queue_vhost_field(rabbit_queue),
+ ok = queue_vhost_field(rabbit_durable_queue),
+ {atomic, ok} = mnesia:add_table_index(rabbit_queue, vhost),
+ {atomic, ok} = mnesia:add_table_index(rabbit_durable_queue, vhost),
+ ok.
+
+queue_vhost_field(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name = {resource, VHost, queue, _QName}, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, OperatorPolicy, GmPids, Decorators,
+ State, PolicyVersion, SlavePidsPendingShutdown, VHost}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, operator_policy,
+ gm_pids, decorators, state, policy_version, slave_pids_pending_shutdown, vhost]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 2d9a2f075e..bd79f4dd45 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -23,7 +23,8 @@
-export([register/0]).
-export([parse_set/2, clear/1]).
-export([validate/5, notify/4, notify_clear/3]).
--export([connection_limit/1]).
+-export([connection_limit/1, queue_limit/1,
+ is_over_queue_limit/1, is_over_connection_limit/1]).
-import(rabbit_misc, [pget/2]).
@@ -53,6 +54,56 @@ notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) ->
connection_limit(VirtualHost) ->
get_limit(VirtualHost, <<"max-connections">>).
+queue_limit(VirtualHost) ->
+ get_limit(VirtualHost, <<"max-queues">>).
+
+
+-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
+
+is_over_connection_limit(VirtualHost) ->
+ case rabbit_vhost_limit:connection_limit(VirtualHost) of
+ %% no limit configured
+ undefined -> false;
+ %% with limit = 0, no connections are allowed
+ {ok, 0} -> {true, 0};
+ {ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
+ ConnectionCount = rabbit_connection_tracking:count_connections_in(VirtualHost),
+ case ConnectionCount >= Limit of
+ false -> false;
+ true -> {true, Limit}
+ end;
+ %% any negative value means "no limit". Note that parameter validation
+ %% will replace negative integers with 'undefined', so this is to be
+ %% explicit and extra defensive
+ {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
+ %% ignore non-integer limits
+ {ok, _Limit} -> false
+ end.
+
+
+-spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
+
+is_over_queue_limit(VirtualHost) ->
+ case queue_limit(VirtualHost) of
+ %% no limit configured
+ undefined -> false;
+ %% with limit = 0, no queues can be declared (perhaps not very
+ %% useful but consistent with the connection limit)
+ {ok, 0} -> {true, 0};
+ {ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
+ QueueCount = rabbit_amqqueue:count(VirtualHost),
+ case QueueCount >= Limit of
+ false -> false;
+ true -> {true, Limit}
+ end;
+ %% any negative value means "no limit". Note that parameter validation
+ %% will replace negative integers with 'undefined', so this is to be
+ %% explicit and extra defensive
+ {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
+ %% ignore non-integer limits
+ {ok, _Limit} -> false
+ end.
+
%%----------------------------------------------------------------------------
parse_set(VHost, Defn) ->
@@ -72,7 +123,8 @@ clear(VHost) ->
<<"limits">>).
vhost_limit_validation() ->
- [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, mandatory}].
+ [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional},
+ {<<"max-queues">>, fun rabbit_parameter_validation:integer/2, optional}].
update_vhost(VHostName, Limits) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/test/per_vhost_queue_limit_SUITE.erl b/test/per_vhost_queue_limit_SUITE.erl
new file mode 100644
index 0000000000..1c6bea08dd
--- /dev/null
+++ b/test/per_vhost_queue_limit_SUITE.erl
@@ -0,0 +1,693 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(per_vhost_queue_limit_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-import(rabbit_ct_client_helpers, [open_unmanaged_connection/3,
+ close_connection_and_channel/2]).
+
+all() ->
+ [
+ {group, cluster_size_1}
+ , {group, cluster_size_2}
+ ].
+
+groups() ->
+ [
+ {cluster_size_1, [], [
+ most_basic_single_node_queue_count,
+ single_node_single_vhost_queue_count,
+ single_node_multiple_vhosts_queue_count,
+ single_node_single_vhost_limit,
+ single_node_single_vhost_zero_limit,
+ single_node_single_vhost_limit_with_durable_named_queue,
+ single_node_single_vhost_zero_limit_with_durable_named_queue,
+ single_node_single_vhost_limit_with_queue_ttl,
+ single_node_single_vhost_limit_with_redeclaration
+ ]},
+ {cluster_size_2, [], [
+ most_basic_cluster_queue_count,
+ cluster_multiple_vhosts_queue_count,
+ cluster_multiple_vhosts_limit,
+ cluster_multiple_vhosts_zero_limit,
+ cluster_multiple_vhosts_limit_with_durable_named_queue,
+ cluster_multiple_vhosts_zero_limit_with_durable_named_queue,
+ cluster_node_restart_queue_count
+ ]}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_1, Config) ->
+ init_per_multinode_group(cluster_size_1, Config, 1);
+init_per_group(cluster_size_2, Config) ->
+ init_per_multinode_group(cluster_size_2, Config, 2);
+init_per_group(cluster_rename, Config) ->
+ init_per_multinode_group(cluster_rename, Config, 2).
+
+init_per_multinode_group(Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ case Group of
+ cluster_rename ->
+ % The broker is managed by {init,end}_per_testcase().
+ Config1;
+ _ ->
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps())
+ end.
+
+end_per_group(cluster_rename, Config) ->
+ % The broker is managed by {init,end}_per_testcase().
+ Config;
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Config.
+
+end_per_testcase(vhost_limit_after_node_renamed = Testcase, Config) ->
+ Config1 = ?config(save_config, Config),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase);
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+
+most_basic_single_node_queue_count(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+ Conn = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ declare_exclusive_queues(Ch, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost)),
+ close_connection_and_channel(Conn, Ch),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+single_node_single_vhost_queue_count(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+ Conn = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ declare_exclusive_queues(Ch, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost)),
+ declare_durable_queues(Ch, 10),
+ ?assertEqual(20, count_queues_in(Config, VHost)),
+ delete_durable_queues(Ch, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost)),
+ close_connection_and_channel(Conn, Ch),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+single_node_multiple_vhosts_queue_count(Config) ->
+ VHost1 = <<"queue-limits1">>,
+ VHost2 = <<"queue-limits2">>,
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_queues_in(Config, VHost1)),
+ ?assertEqual(0, count_queues_in(Config, VHost2)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ Conn2 = open_unmanaged_connection(Config, 0, VHost2),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ declare_exclusive_queues(Ch1, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost1)),
+ declare_durable_queues(Ch1, 10),
+ ?assertEqual(20, count_queues_in(Config, VHost1)),
+ delete_durable_queues(Ch1, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost1)),
+ declare_exclusive_queues(Ch2, 30),
+ ?assertEqual(30, count_queues_in(Config, VHost2)),
+ close_connection_and_channel(Conn1, Ch1),
+ ?assertEqual(0, count_queues_in(Config, VHost1)),
+ close_connection_and_channel(Conn2, Ch2),
+ ?assertEqual(0, count_queues_in(Config, VHost2)),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+single_node_single_vhost_limit(Config) ->
+ single_node_single_vhost_limit_with(Config, 5),
+ single_node_single_vhost_limit_with(Config, 10).
+single_node_single_vhost_zero_limit(Config) ->
+ single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue = <<"">>,
+ exclusive = true}).
+
+single_node_single_vhost_limit_with_durable_named_queue(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+
+ set_vhost_queue_limit(Config, VHost, 3),
+ Conn = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q1">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q2">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q3">>,
+ exclusive = false,
+ durable = true}),
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"Q4">>,
+ exclusive = false,
+ durable = true})
+ end),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+single_node_single_vhost_zero_limit_with_durable_named_queue(Config) ->
+ single_node_single_vhost_zero_limit_with(Config, #'queue.declare'{queue = <<"Q4">>,
+ exclusive = false,
+ durable = true}).
+
+single_node_single_vhost_limit_with(Config, WatermarkLimit) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+
+ set_vhost_queue_limit(Config, VHost, 3),
+ Conn = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+
+ set_vhost_queue_limit(Config, VHost, WatermarkLimit),
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end, lists:seq(1, WatermarkLimit)),
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+single_node_single_vhost_zero_limit_with(Config, QueueDeclare) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+
+ set_vhost_queue_limit(Config, VHost, 0),
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch1, QueueDeclare)
+ end),
+
+ Conn2 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ %% lift the limit
+ set_vhost_queue_limit(Config, VHost, -1),
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end, lists:seq(1, 100)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+
+single_node_single_vhost_limit_with_queue_ttl(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+
+ set_vhost_queue_limit(Config, VHost, 3),
+
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
+ exclusive = true,
+ arguments = [{<<"x-expires">>, long, 2000}]})
+ end, lists:seq(1, 3)),
+
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end),
+
+ Conn2 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ %% wait for the queues to expire
+ timer:sleep(3000),
+
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
+ exclusive = true}),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+
+single_node_single_vhost_limit_with_redeclaration(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+
+ set_vhost_queue_limit(Config, VHost, 3),
+ Conn1 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q1">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q2">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>,
+ exclusive = false,
+ durable = true}),
+
+ %% can't declare a new queue...
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q4">>,
+ exclusive = false,
+ durable = true})
+ end),
+
+
+ Conn2 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ %% ...but re-declarations succeed
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q1">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q2">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>,
+ exclusive = false,
+ durable = true}),
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q4">>,
+ exclusive = false,
+ durable = true})
+ end),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+
+most_basic_cluster_queue_count(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(0, count_queues_in(Config, VHost, 1)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ declare_exclusive_queues(Ch1, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(10, count_queues_in(Config, VHost, 1)),
+
+ Conn2 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+ declare_exclusive_queues(Ch2, 15),
+ ?assertEqual(25, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(25, count_queues_in(Config, VHost, 1)),
+ close_connection_and_channel(Conn1, Ch1),
+ close_connection_and_channel(Conn2, Ch2),
+ ?assertEqual(0, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(0, count_queues_in(Config, VHost, 1)),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+cluster_node_restart_queue_count(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(0, count_queues_in(Config, VHost, 1)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ declare_exclusive_queues(Ch1, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(10, count_queues_in(Config, VHost, 1)),
+
+ rabbit_ct_broker_helpers:restart_broker(Config, 0),
+ ?assertEqual(0, count_queues_in(Config, VHost, 0)),
+
+ Conn2 = open_unmanaged_connection(Config, 1, VHost),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+ declare_exclusive_queues(Ch2, 15),
+ ?assertEqual(15, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(15, count_queues_in(Config, VHost, 1)),
+
+ declare_durable_queues(Ch2, 10),
+ ?assertEqual(25, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(25, count_queues_in(Config, VHost, 1)),
+
+ rabbit_ct_broker_helpers:restart_broker(Config, 1),
+
+ ?assertEqual(10, count_queues_in(Config, VHost, 0)),
+ ?assertEqual(10, count_queues_in(Config, VHost, 1)),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+
+cluster_multiple_vhosts_queue_count(Config) ->
+ VHost1 = <<"queue-limits1">>,
+ VHost2 = <<"queue-limits2">>,
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+
+ ?assertEqual(0, count_queues_in(Config, VHost1)),
+ ?assertEqual(0, count_queues_in(Config, VHost2)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ declare_exclusive_queues(Ch1, 10),
+ ?assertEqual(10, count_queues_in(Config, VHost1, 0)),
+ ?assertEqual(10, count_queues_in(Config, VHost1, 1)),
+ ?assertEqual(0, count_queues_in(Config, VHost2, 0)),
+ ?assertEqual(0, count_queues_in(Config, VHost2, 1)),
+
+ Conn2 = open_unmanaged_connection(Config, 0, VHost2),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+ declare_exclusive_queues(Ch2, 15),
+ ?assertEqual(15, count_queues_in(Config, VHost2, 0)),
+ ?assertEqual(15, count_queues_in(Config, VHost2, 1)),
+ close_connection_and_channel(Conn1, Ch1),
+ close_connection_and_channel(Conn2, Ch2),
+ ?assertEqual(0, count_queues_in(Config, VHost1, 0)),
+ ?assertEqual(0, count_queues_in(Config, VHost1, 1)),
+ ?assertEqual(0, count_queues_in(Config, VHost2, 0)),
+ ?assertEqual(0, count_queues_in(Config, VHost2, 1)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+cluster_multiple_vhosts_limit(Config) ->
+ cluster_multiple_vhosts_limit_with(Config, 10),
+ cluster_multiple_vhosts_limit_with(Config, 20).
+
+cluster_multiple_vhosts_limit_with(Config, WatermarkLimit) ->
+ VHost1 = <<"queue-limits1">>,
+ VHost2 = <<"queue-limits2">>,
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+ ?assertEqual(0, count_queues_in(Config, VHost1)),
+ ?assertEqual(0, count_queues_in(Config, VHost2)),
+
+ set_vhost_queue_limit(Config, VHost1, 3),
+ set_vhost_queue_limit(Config, VHost2, 3),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ set_vhost_queue_limit(Config, VHost1, WatermarkLimit),
+
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end, lists:seq(1, WatermarkLimit)),
+
+ Conn2 = open_unmanaged_connection(Config, 1, VHost2),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+ set_vhost_queue_limit(Config, VHost2, WatermarkLimit),
+
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end, lists:seq(1, WatermarkLimit)),
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end),
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+
+cluster_multiple_vhosts_zero_limit(Config) ->
+ cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue = <<"">>,
+ exclusive = true}).
+
+cluster_multiple_vhosts_limit_with_durable_named_queue(Config) ->
+ VHost1 = <<"queue-limits1">>,
+ VHost2 = <<"queue-limits2">>,
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+ ?assertEqual(0, count_queues_in(Config, VHost1)),
+ ?assertEqual(0, count_queues_in(Config, VHost2)),
+
+ set_vhost_queue_limit(Config, VHost1, 3),
+ set_vhost_queue_limit(Config, VHost2, 3),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+
+ Conn2 = open_unmanaged_connection(Config, 1, VHost2),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q1">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q2">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>,
+ exclusive = false,
+ durable = true}),
+
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q1">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q2">>,
+ exclusive = false,
+ durable = true}),
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>,
+ exclusive = false,
+ durable = true}),
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"Q3">>,
+ exclusive = false,
+ durable = true})
+ end),
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"Q3">>,
+ exclusive = false,
+ durable = true})
+ end),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+cluster_multiple_vhosts_zero_limit_with_durable_named_queue(Config) ->
+ cluster_multiple_vhosts_zero_limit_with(Config, #'queue.declare'{queue = <<"Q4">>,
+ exclusive = false,
+ durable = true}).
+
+cluster_multiple_vhosts_zero_limit_with(Config, QueueDeclare) ->
+ VHost1 = <<"queue-limits1">>,
+ VHost2 = <<"queue-limits2">>,
+ set_up_vhost(Config, VHost1),
+ set_up_vhost(Config, VHost2),
+ ?assertEqual(0, count_queues_in(Config, VHost1)),
+ ?assertEqual(0, count_queues_in(Config, VHost2)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+ Conn2 = open_unmanaged_connection(Config, 1, VHost2),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ set_vhost_queue_limit(Config, VHost1, 0),
+ set_vhost_queue_limit(Config, VHost2, 0),
+
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch1, QueueDeclare)
+ end),
+ expect_shutdown_due_to_precondition_failed(
+ fun () ->
+ amqp_channel:call(Ch2, QueueDeclare)
+ end),
+
+
+ Conn3 = open_unmanaged_connection(Config, 0, VHost1),
+ {ok, Ch3} = amqp_connection:open_channel(Conn3),
+ Conn4 = open_unmanaged_connection(Config, 1, VHost2),
+ {ok, Ch4} = amqp_connection:open_channel(Conn4),
+
+ %% lift the limits
+ set_vhost_queue_limit(Config, VHost1, -1),
+ set_vhost_queue_limit(Config, VHost2, -1),
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch3, #'queue.declare'{queue = <<"">>,
+ exclusive = true}),
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch4, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end, lists:seq(1, 400)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost1),
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost2).
+
+
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+set_up_vhost(Config, VHost) ->
+ rabbit_ct_broker_helpers:add_vhost(Config, VHost),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
+ set_vhost_queue_limit(Config, VHost, -1).
+
+set_vhost_queue_limit(Config, VHost, Count) ->
+ set_vhost_queue_limit(Config, 0, VHost, Count).
+
+set_vhost_queue_limit(Config, NodeIndex, VHost, Count) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ rabbit_ct_broker_helpers:control_action(
+ set_vhost_limits, Node,
+ ["{\"max-queues\": " ++ integer_to_list(Count) ++ "}"],
+ [{"-p", binary_to_list(VHost)}]).
+
+count_queues_in(Config, VHost) ->
+ count_queues_in(Config, VHost, 0).
+count_queues_in(Config, VHost, NodeIndex) ->
+ timer:sleep(200),
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_amqqueue,
+ count, [VHost]).
+
+declare_exclusive_queues(Ch, N) ->
+ lists:foreach(fun (_) ->
+ amqp_channel:call(Ch,
+ #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end,
+ lists:seq(1, N)).
+
+declare_durable_queues(Ch, N) ->
+ lists:foreach(fun (I) ->
+ amqp_channel:call(Ch,
+ #'queue.declare'{queue = durable_queue_name(I),
+ exclusive = false,
+ durable = true})
+ end,
+ lists:seq(1, N)).
+
+delete_durable_queues(Ch, N) ->
+ lists:foreach(fun (I) ->
+ amqp_channel:call(Ch,
+ #'queue.delete'{queue = durable_queue_name(I)})
+ end,
+ lists:seq(1, N)).
+
+durable_queue_name(N) when is_integer(N) ->
+ iolist_to_binary(io_lib:format("queue-limits-durable-~p", [N])).
+
+expect_shutdown_due_to_precondition_failed(Thunk) ->
+ try
+ Thunk(),
+ ok
+ catch _:{{shutdown, {server_initiated_close, 406, _}}, _} ->
+ %% expected
+ ok
+ end.