diff options
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 55 | ||||
| -rw-r--r-- | test/per_vhost_queue_limit_SUITE.erl | 66 |
4 files changed, 120 insertions, 28 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index b1d34ca516..9fd3244979 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -37,7 +37,7 @@ list/0, list/1, list_on_node/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, - is_over_connection_limit/1, count_connections_in/1]). + count_connections_in/1]). -include_lib("rabbit.hrl"). @@ -214,29 +214,6 @@ list_on_node(Node) -> catch exit:{aborted, {no_exists, _}} -> [] end. --spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. - -is_over_connection_limit(VirtualHost) -> - case rabbit_vhost_limit:connection_limit(VirtualHost) of - %% no limit configured - undefined -> false; - %% with limit = 0, no connections are allowed - {ok, 0} -> {true, 0}; - {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> - ConnectionCount = count_connections_in(VirtualHost), - case ConnectionCount >= Limit of - false -> false; - true -> {true, Limit} - end; - %% any negative value means "no limit". Note that parameter validation - %% will replace negative integers with 'undefined', so this is to be - %% explicit and extra defensive - {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; - %% ignore non-integer limits - {ok, _Limit} -> false - end. - - -spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). count_connections_in(VirtualHost) -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 858681ecfd..53b0340b8a 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -106,7 +106,7 @@ is_over_connection_limit(VHost, {Username, _Password}, Pid) -> none -> ""; _ -> Username end, - try rabbit_connection_tracking:is_over_connection_limit(VHost) of + try rabbit_vhost_limit:is_over_connection_limit(VHost) of false -> false; {true, Limit} -> rabbit_log_connection:error( diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl index 2d9a2f075e..54ed7c1c62 100644 --- a/src/rabbit_vhost_limit.erl +++ b/src/rabbit_vhost_limit.erl @@ -23,7 +23,8 @@ -export([register/0]). -export([parse_set/2, clear/1]). -export([validate/5, notify/4, notify_clear/3]). --export([connection_limit/1]). +-export([connection_limit/1, queue_limit/1, + is_over_queue_limit/1, is_over_connection_limit/1]). -import(rabbit_misc, [pget/2]). @@ -53,6 +54,55 @@ notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) -> connection_limit(VirtualHost) -> get_limit(VirtualHost, <<"max-connections">>). +queue_limit(VirtualHost) -> + get_limit(VirtualHost, <<"max-queues">>). + + +-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. + +is_over_connection_limit(VirtualHost) -> + case rabbit_vhost_limit:connection_limit(VirtualHost) of + %% no limit configured + undefined -> false; + %% with limit = 0, no connections are allowed + {ok, 0} -> {true, 0}; + {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> + ConnectionCount = rabbit_connection_tracking:count_connections_in(VirtualHost), + case ConnectionCount >= Limit of + false -> false; + true -> {true, Limit} + end; + %% any negative value means "no limit". Note that parameter validation + %% will replace negative integers with 'undefined', so this is to be + %% explicit and extra defensive + {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; + %% ignore non-integer limits + {ok, _Limit} -> false + end. + + +-spec is_over_queue_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false. + +is_over_queue_limit(VirtualHost) -> + case queue_limit(VirtualHost) of + %% no limit configured + undefined -> false; + %% with limit = 0, no connections are allowed + {ok, 0} -> {true, 0}; + {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> + QueueCount = rabbit_amqqueue:count(VirtualHost), + case QueueCount >= Limit of + false -> false; + true -> {true, Limit} + end; + %% any negative value means "no limit". Note that parameter validation + %% will replace negative integers with 'undefined', so this is to be + %% explicit and extra defensive + {ok, Limit} when is_integer(Limit) andalso Limit < 0 -> false; + %% ignore non-integer limits + {ok, _Limit} -> false + end. + %%---------------------------------------------------------------------------- parse_set(VHost, Defn) -> @@ -72,7 +122,8 @@ clear(VHost) -> <<"limits">>). vhost_limit_validation() -> - [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, mandatory}]. + [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional}, + {<<"max-queues">>, fun rabbit_parameter_validation:integer/2, optional}]. update_vhost(VHostName, Limits) -> rabbit_misc:execute_mnesia_transaction( diff --git a/test/per_vhost_queue_limit_SUITE.erl b/test/per_vhost_queue_limit_SUITE.erl index 4edeaa24f0..e7edbe2b32 100644 --- a/test/per_vhost_queue_limit_SUITE.erl +++ b/test/per_vhost_queue_limit_SUITE.erl @@ -36,7 +36,9 @@ groups() -> {cluster_size_1, [], [ most_basic_single_node_queue_count, single_node_single_vhost_queue_count, - single_node_multiple_vhosts_queue_count + single_node_multiple_vhosts_queue_count, + single_node_single_vhost_limit, + single_node_single_vhost_zero_limit ]}, {cluster_size_2, [], [ most_basic_cluster_queue_count @@ -201,6 +203,68 @@ most_basic_cluster_queue_count(Config) -> ?assertEqual(0, count_queues_in(Config, VHost, 1)), rabbit_ct_broker_helpers:delete_vhost(Config, VHost). +single_node_single_vhost_limit(Config) -> + single_node_single_vhost_limit_with(Config, 5), + single_node_single_vhost_limit_with(Config, 10). +single_node_single_vhost_zero_limit(Config) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch1} = amqp_connection:open_channel(Conn1), + + set_vhost_queue_limit(Config, VHost, 0), + + try + amqp_channel:call(Ch1, #'queue.declare'{queue = <<"">>, + exclusive = true}), + ok + catch _:{{shutdown, {server_initiated_close, 406, _}}, _} -> + %% expected + ok + end, + + Conn2 = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch2} = amqp_connection:open_channel(Conn2), + + %% lift the limit + set_vhost_queue_limit(Config, VHost, -1), + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch2, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, lists:seq(1, 100)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + +single_node_single_vhost_limit_with(Config, WatermarkLimit) -> + VHost = <<"queue-limits">>, + set_up_vhost(Config, VHost), + ?assertEqual(0, count_queues_in(Config, VHost)), + + set_vhost_queue_limit(Config, VHost, 3), + Conn = open_unmanaged_connection(Config, 0, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + + set_vhost_queue_limit(Config, VHost, WatermarkLimit), + lists:foreach(fun (_) -> + #'queue.declare_ok'{queue = _} = + amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>, + exclusive = true}) + end, lists:seq(1, WatermarkLimit)), + + try + amqp_channel:call(Ch, #'queue.declare'{queue = <<"">>, + exclusive = true}), + ok + catch _:{{shutdown, {server_initiated_close, 406, _}}, _} -> + %% expected + ok + end, + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- |
