summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_connection_tracking.erl25
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_vhost_limit.erl55
-rw-r--r--test/per_vhost_queue_limit_SUITE.erl66
4 files changed, 120 insertions, 28 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index b1d34ca516..9fd3244979 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -37,7 +37,7 @@
list/0, list/1, list_on_node/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
- is_over_connection_limit/1, count_connections_in/1]).
+ count_connections_in/1]).
-include_lib("rabbit.hrl").
@@ -214,29 +214,6 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.
--spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
-
-is_over_connection_limit(VirtualHost) ->
- case rabbit_vhost_limit:connection_limit(VirtualHost) of
- %% no limit configured
- undefined -> false;
- %% with limit = 0, no connections are allowed
- {ok, 0} -> {true, 0};
- {ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
- ConnectionCount = count_connections_in(VirtualHost),
- case ConnectionCount >= Limit of
- false -> false;
- true -> {true, Limit}
- end;
- %% any negative value means "no limit". Note that parameter validation
- %% will replace negative integers with 'undefined', so this is to be
- %% explicit and extra defensive
- {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
- %% ignore non-integer limits
- {ok, _Limit} -> false
- end.
-
-
-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
count_connections_in(VirtualHost) ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 858681ecfd..53b0340b8a 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -106,7 +106,7 @@ is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
none -> "";
_ -> Username
end,
- try rabbit_connection_tracking:is_over_connection_limit(VHost) of
+ try rabbit_vhost_limit:is_over_connection_limit(VHost) of
false -> false;
{true, Limit} ->
rabbit_log_connection:error(
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 2d9a2f075e..54ed7c1c62 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -23,7 +23,8 @@
-export([register/0]).
-export([parse_set/2, clear/1]).
-export([validate/5, notify/4, notify_clear/3]).
--export([connection_limit/1]).
+-export([connection_limit/1, queue_limit/1,
+ is_over_queue_limit/1, is_over_connection_limit/1]).
-import(rabbit_misc, [pget/2]).
@@ -53,6 +54,55 @@ notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) ->
connection_limit(VirtualHost) ->
get_limit(VirtualHost, <<"max-connections">>).
+queue_limit(VirtualHost) ->
+ get_limit(VirtualHost, <<"max-queues">>).
+
+
+-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
+
+is_over_connection_limit(VirtualHost) ->
+ case rabbit_vhost_limit:connection_limit(VirtualHost) of
+ %% no limit configured
+ undefined -> false;
+ %% with limit = 0, no connections are allowed
+ {ok, 0} -> {true, 0};
+ {ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
+ ConnectionCount = rabbit_connection_tracking:count_connections_in(VirtualHost),
+ case ConnectionCount >= Limit of
+ false -> false;
+ true -> {true, Limit}
+ end;
+ %% any negative value means "no limit". Note that parameter validation
+ %% will replace negative integers with 'undefined', so this is to be
+ %% explicit and extra defensive
+ {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
+ %% ignore non-integer limits
+ {ok, _Limit} -> false
+ end.
+
+
+-spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
+
+is_over_queue_limit(VirtualHost) ->
+ case queue_limit(VirtualHost) of
+ %% no limit configured
+ undefined -> false;
+ %% with limit = 0, no connections are allowed
+ {ok, 0} -> {true, 0};
+ {ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
+ QueueCount = rabbit_amqqueue:count(VirtualHost),
+ case QueueCount >= Limit of
+ false -> false;
+ true -> {true, Limit}
+ end;
+ %% any negative value means "no limit". Note that parameter validation
+ %% will replace negative integers with 'undefined', so this is to be
+ %% explicit and extra defensive
+ {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false;
+ %% ignore non-integer limits
+ {ok, _Limit} -> false
+ end.
+
%%----------------------------------------------------------------------------
parse_set(VHost, Defn) ->
@@ -72,7 +122,8 @@ clear(VHost) ->
<<"limits">>).
vhost_limit_validation() ->
- [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, mandatory}].
+ [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional},
+ {<<"max-queues">>, fun rabbit_parameter_validation:integer/2, optional}].
update_vhost(VHostName, Limits) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/test/per_vhost_queue_limit_SUITE.erl b/test/per_vhost_queue_limit_SUITE.erl
index 4edeaa24f0..e7edbe2b32 100644
--- a/test/per_vhost_queue_limit_SUITE.erl
+++ b/test/per_vhost_queue_limit_SUITE.erl
@@ -36,7 +36,9 @@ groups() ->
{cluster_size_1, [], [
most_basic_single_node_queue_count,
single_node_single_vhost_queue_count,
- single_node_multiple_vhosts_queue_count
+ single_node_multiple_vhosts_queue_count,
+ single_node_single_vhost_limit,
+ single_node_single_vhost_zero_limit
]},
{cluster_size_2, [], [
most_basic_cluster_queue_count
@@ -201,6 +203,68 @@ most_basic_cluster_queue_count(Config) ->
?assertEqual(0, count_queues_in(Config, VHost, 1)),
rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+single_node_single_vhost_limit(Config) ->
+ single_node_single_vhost_limit_with(Config, 5),
+ single_node_single_vhost_limit_with(Config, 10).
+single_node_single_vhost_zero_limit(Config) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+
+ Conn1 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch1} = amqp_connection:open_channel(Conn1),
+
+ set_vhost_queue_limit(Config, VHost, 0),
+
+ try
+ amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>,
+ exclusive = true}),
+ ok
+ catch _:{{shutdown, {server_initiated_close, 406, _}}, _} ->
+ %% expected
+ ok
+ end,
+
+ Conn2 = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch2} = amqp_connection:open_channel(Conn2),
+
+ %% lift the limit
+ set_vhost_queue_limit(Config, VHost, -1),
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end, lists:seq(1, 100)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
+single_node_single_vhost_limit_with(Config, WatermarkLimit) ->
+ VHost = <<"queue-limits">>,
+ set_up_vhost(Config, VHost),
+ ?assertEqual(0, count_queues_in(Config, VHost)),
+
+ set_vhost_queue_limit(Config, VHost, 3),
+ Conn = open_unmanaged_connection(Config, 0, VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+
+ set_vhost_queue_limit(Config, VHost, WatermarkLimit),
+ lists:foreach(fun (_) ->
+ #'queue.declare_ok'{queue = _} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>,
+ exclusive = true})
+ end, lists:seq(1, WatermarkLimit)),
+
+ try
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>,
+ exclusive = true}),
+ ok
+ catch _:{{shutdown, {server_initiated_close, 406, _}}, _} ->
+ %% expected
+ ok
+ end,
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, VHost).
+
%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------