diff options
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 55 | ||||
| -rw-r--r-- | include/rabbit_cli.hrl | 1 | ||||
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_connection_tracker.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 229 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 98 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_SUITE.erl | 660 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_partitions_SUITE.erl | 164 |
15 files changed, 1526 insertions, 11 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 363748e046..9cd1198a28 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1215,6 +1215,55 @@ </refsect2> <refsect2> + <title>Virtual Host Limits</title> + <para> + It is possible to enforce certain limits on virtual hosts. + </para> + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>set_vhost_limits</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="req"><replaceable>definition</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Sets virtual host limits + </para> + <variablelist> + <varlistentry> + <term>definition</term> + <listitem><para> + The definition of the limits, as a + JSON term. In most shells you are very likely to + need to quote this. + + Recognised limits: max-connections (0 means "no limit"). + </para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_vhost_limits -p qa_env '{"max-connections": 1024}'</screen> + <para role="example"> + This command limits the max number of concurrent connections in vhost <command>qa_env</command> + to 1024. + </para> + </listitem> + </varlistentry> + +<varlistentry> + <term><cmdsynopsis><command>clear_vhost_limits</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <listitem> + <para> + Clears virtual host limits + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_vhost_limits -p qa_env</screen> + <para role="example"> + This command clears vhost limits in vhost <command>qa_env</command>. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect2> + + <refsect2> <title>Server Status</title> <para> The server status queries interrogate the server and return a list of @@ -2036,9 +2085,9 @@ <varlistentry> <term>fraction</term> <listitem><para> - Limit relative to the total amount available RAM - as a non-negative floating point number. - Values lower than 1.0 can be dangerous and + Limit relative to the total amount available RAM + as a non-negative floating point number. + Values lower than 1.0 can be dangerous and should be used carefully. </para></listitem> </varlistentry> diff --git a/include/rabbit_cli.hrl b/include/rabbit_cli.hrl index a0d1ecfdd5..0e338855cd 100644 --- a/include/rabbit_cli.hrl +++ b/include/rabbit_cli.hrl @@ -30,7 +30,6 @@ -define(OFFLINE_OPT, "--offline"). -define(ONLINE_OPT, "--online"). - -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). diff --git a/src/rabbit.erl b/src/rabbit.erl index 2fa4cdee71..59ede3c802 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -178,6 +178,11 @@ {mfa, {rabbit_direct, boot, []}}, {requires, log_relay}]}). +-rabbit_boot_step({connection_tracker, + [{description, "helps track node-local connections"}, + {mfa, {rabbit_connection_tracker, boot, []}}, + {requires, log_relay}]}). + -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, {requires, log_relay}]}). diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl new file mode 100644 index 0000000000..0177627d83 --- /dev/null +++ b/src/rabbit_connection_tracker.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracker). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_connection_tracking_handler +%% * rabbit_reader +%% * rabbit_event + +-behaviour(gen_server2). + +%% API +-export([boot/0, start_link/0, reregister/1]). + +%% gen_fsm callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + + +%%%=================================================================== +%%% API +%%%=================================================================== + +boot() -> + {ok, _} = start_link(), + ok. + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +reregister(Node) -> + rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]), + gen_server2:cast({?SERVER, Node}, reregister). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + {ok, {}}. + +handle_call(_Req, _From, State) -> + {noreply, State}. + +handle_cast(reregister, State) -> + Cs = rabbit_networking:connections_local(), + rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]), + case Cs of + [] -> ok; + Cs -> + [reregister_connection(C) || C <- Cs], + ok + end, + rabbit_log:info("Done re-registering client connections"), + {noreply, State}. + +handle_info(_Req, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +reregister_connection(Conn) -> + try + Conn ! reregister + catch _:Error -> + rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error]) + end. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl new file mode 100644 index 0000000000..c65023a2a8 --- /dev/null +++ b/src/rabbit_connection_tracking.erl @@ -0,0 +1,229 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_connection_tracking_handler +%% * rabbit_reader +%% * rabbit_event + +-export([register_connection/1, unregister_connection/1, + list/0, list/1, list_on_node/1, + tracked_connection_from_connection_created/1, + tracked_connection_from_connection_state/1, + is_over_connection_limit/1, count_connections_in/1, + on_node_down/1, on_node_up/1]). + +-include_lib("rabbit.hrl"). + +-define(TABLE, rabbit_tracked_connection). +-define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost). +-define(SERVER, ?MODULE). + +%% +%% API +%% + +-spec register_connection(rabbit_types:tracked_connection()) -> ok. + +register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + %% upsert + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> + mnesia:write(?TABLE, Conn, write), + mnesia:dirty_update_counter( + rabbit_tracked_connection_per_vhost, VHost, 1); + [_Row] -> + ok + end, + ok + end). + +-spec unregister_connection(rabbit_types:connection_name()) -> ok. + +unregister_connection(ConnId = {_Node, _Name}) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:dirty_read(?TABLE, ConnId) of + [] -> ok; + [Row] -> + mnesia:dirty_update_counter( + ?PER_VHOST_COUNTER_TABLE, + Row#tracked_connection.vhost, -1), + mnesia:delete({?TABLE, ConnId}) + end + end). + + +-spec list() -> [rabbit_types:tracked_connection()]. + +list() -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}). + + +-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. + +list(VHost) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}). + + +-spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node) -> + mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}). + + +-spec on_node_down(node()) -> ok. + +on_node_down(Node) -> + case lists:member(Node, nodes()) of + false -> + Cs = list_on_node(Node), + rabbit_log:info( + "Node ~p is down, unregistering ~p client connections~n", + [Node, length(Cs)]), + [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs], + ok; + true -> rabbit_log:info( + "Keeping ~s connections: the node is already back~n", [Node]) + end. + +-spec on_node_up(node()) -> ok. +on_node_up(Node) -> + rabbit_connection_tracker:reregister(Node), + ok. + +-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean(). + +is_over_connection_limit(VirtualHost) -> + ConnectionCount = count_connections_in(VirtualHost), + case rabbit_vhost_limit:connection_limit(VirtualHost) of + undefined -> false; + {ok, Limit} -> case {ConnectionCount, ConnectionCount >= Limit} of + %% 0 = no limit + {0, _} -> false; + %% the limit hasn't been reached + {_, false} -> false; + {_N, true} -> {true, Limit} + end + end. + + +-spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). + +count_connections_in(VirtualHost) -> + try + case mnesia:transaction( + fun() -> + case mnesia:dirty_read( + {?PER_VHOST_COUNTER_TABLE, + VirtualHost}) of + [] -> 0; + [Val] -> + Val#tracked_connection_per_vhost.connection_count + end + end) of + {atomic, Val} -> Val; + {aborted, _Reason} -> 0 + end + catch + _:Err -> + rabbit_log:error( + "Failed to fetch number of connections in vhost ~p:~n~p~n", + [VirtualHost, Err]), + 0 + end. + +%% Returns a #tracked_connection from connection_created +%% event details. +%% +%% @see rabbit_connection_tracking_handler. +tracked_connection_from_connection_created(EventDetails) -> + %% Example event: + %% + %% [{type,network}, + %% {pid,<0.329.0>}, + %% {name,<<"127.0.0.1:60998 -> 127.0.0.1:5672">>}, + %% {port,5672}, + %% {peer_port,60998}, + %% {host,{0,0,0,0,0,65535,32512,1}}, + %% {peer_host,{0,0,0,0,0,65535,32512,1}}, + %% {ssl,false}, + %% {peer_cert_subject,''}, + %% {peer_cert_issuer,''}, + %% {peer_cert_validity,''}, + %% {auth_mechanism,<<"PLAIN">>}, + %% {ssl_protocol,''}, + %% {ssl_key_exchange,''}, + %% {ssl_cipher,''}, + %% {ssl_hash,''}, + %% {protocol,{0,9,1}}, + %% {user,<<"guest">>}, + %% {vhost,<<"/">>}, + %% {timeout,14}, + %% {frame_max,131072}, + %% {channel_max,65535}, + %% {client_properties, + %% [{<<"capabilities">>,table, + %% [{<<"publisher_confirms">>,bool,true}, + %% {<<"consumer_cancel_notify">>,bool,true}, + %% {<<"exchange_exchange_bindings">>,bool,true}, + %% {<<"basic.nack">>,bool,true}, + %% {<<"connection.blocked">>,bool,true}, + %% {<<"authentication_failure_close">>,bool,true}]}, + %% {<<"product">>,longstr,<<"Bunny">>}, + %% {<<"platform">>,longstr, + %% <<"ruby 2.3.0p0 (2015-12-25 revision 53290) [x86_64-darwin15]">>}, + %% {<<"version">>,longstr,<<"2.3.0.pre">>}, + %% {<<"information">>,longstr, + %% <<"http://rubybunny.info">>}]}, + %% {connected_at,1453214290847}] + Name = proplists:get_value(name, EventDetails), + Node = proplists:get_value(node, EventDetails), + #tracked_connection{id = {Node, Name}, + name = Name, + node = Node, + vhost = proplists:get_value(vhost, EventDetails), + username = proplists:get_value(user, EventDetails), + connected_at = proplists:get_value(connected_at, EventDetails), + pid = proplists:get_value(pid, EventDetails), + peer_host = proplists:get_value(peer_host, EventDetails), + peer_port = proplists:get_value(peer_port, EventDetails)}. + +tracked_connection_from_connection_state(#connection{ + vhost = VHost, + connected_at = Ts, + peer_host = PeerHost, + peer_port = PeerPort, + user = Username, + name = Name + }) -> + tracked_connection_from_connection_created( + [{name, Name}, + {node, node()}, + {vhost, VHost}, + {user, Username}, + {connected_at, Ts}, + {pid, self()}, + {peer_port, PeerPort}, + {peer_host, PeerHost}]). diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl new file mode 100644 index 0000000000..deaf9bc7f6 --- /dev/null +++ b/src/rabbit_connection_tracking_handler.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_connection_tracking_handler). + +%% This module keeps track of connection creation and termination events +%% on its local node. The primary goal here is to decouple connection +%% tracking from rabbit_reader in rabbit_common. +%% +%% Events from other nodes are ignored. + +%% This module keeps track of connection creation and termination events +%% on its local node. The primary goal here is to decouple connection +%% tracking from rabbit_reader in rabbit_common. +%% +%% Events from other nodes are ignored. + +-behaviour(gen_event). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("rabbit.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "connection tracking event handler"}, + {mfa, {gen_event, add_handler, + [rabbit_event, ?MODULE, []]}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, [rabbit_event, rabbit_node_monitor]}, + {enables, recovery}]}). + + +%% +%% API +%% + +init([]) -> + {ok, []}. + +handle_event(#event{type = connection_created, props = Details}, State) -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_created(Details) + ), + {ok, State}; +%% see rabbit_reader +handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) -> + rabbit_connection_tracking:register_connection( + rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState) + ), + {ok, State}; +handle_event(#event{type = connection_closed, props = Details}, State) -> + %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, + %% {pid,<0.1774.0>}, + %% {node, rabbit@hostname}] + rabbit_connection_tracking:unregister_connection( + {proplists:get_value(node, Details), + proplists:get_value(name, Details)}), + {ok, State}; +handle_event(#event{type = vhost_deleted, props = Details}, State) -> + VHost = proplists:get_value(name, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), + case rabbit_connection_tracking:list(VHost) of + [] -> {ok, State}; + Cs -> + [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || #tracked_connection{pid = Pid} <- Cs], + {ok, State} + end; +handle_event(#event{type = user_deleted, props = Details}, State) -> + _Username = proplists:get_value(name, Details), + %% TODO: force close and unregister connections from + %% this user. Moved to rabbitmq/rabbitmq-server#628. + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 7f410ac752..c56e519fe5 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -74,6 +74,9 @@ {clear_policy, [?VHOST_DEF]}, {list_policies, [?VHOST_DEF]}, + {set_vhost_limits, [?VHOST_DEF]}, + {clear_vhost_limits, [?VHOST_DEF]}, + {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, @@ -544,6 +547,18 @@ action(clear_policy, Node, [Key], Opts, Inform) -> Inform("Clearing policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); +action(set_vhost_limits, Node, [Defn], Opts, Inform) -> + Msg = "Setting vhost limits for vhost ~p", + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform(Msg, [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, parse_set, [VHostArg, Defn]), + ok; + +action(clear_vhost_limits, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing vhost ~p limits", [VHostArg]), + rpc_call(Node, rabbit_vhost_limit, clear, [VHostArg]); + action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 26a864f0f5..43d268c374 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -466,12 +466,14 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> {[], true, disc} -> %% First disc node up maybe_force_load(), + ok = rabbit_table:ensure_secondary_indices(), ok; {[_ | _], _, _} -> %% Subsequent node in cluster, catch up maybe_force_load(), ok = rabbit_table:wait_for_replicated(), - ok = rabbit_table:create_local_copy(NodeType) + ok = rabbit_table:create_local_copy(NodeType), + ok = rabbit_table:ensure_secondary_indices() end, ensure_schema_integrity(), rabbit_node_monitor:update_cluster_status(), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0322aacfd1..9ed790c75d 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -732,6 +732,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node), + ok = rabbit_connection_tracking:on_node_down(Node), %% If we have been partitioned, and we are now in the only remaining %% partition, we no longer care about partitions - forget them. Note %% that we do not attempt to deal with individual (other) partitions @@ -760,7 +761,8 @@ ensure_keepalive_timer(State) -> handle_live_rabbit(Node) -> ok = rabbit_amqqueue:on_node_up(Node), ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node). + ok = rabbit_mnesia:on_node_up(Node), + ok = rabbit_connection_tracking:on_node_up(Node). maybe_autoheal(State = #state{partitions = []}) -> State; diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 3909096964..60996c1539 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -18,7 +18,8 @@ -export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1, force_load/0, is_present/0, is_empty/0, needs_default_data/0, - check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]). + check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0, + ensure_secondary_indices/0, ensure_secondary_indices/2]). -include("rabbit.hrl"). @@ -50,6 +51,7 @@ create() -> Tab, TabDef1, Reason}}) end end, definitions()), + ok = rabbit_table:ensure_secondary_indices(), ok. %% The sequence in which we delete the schema and then the other @@ -63,6 +65,13 @@ create_local_copy(ram) -> create_local_copies(ram), create_local_copy(schema, ram_copies). +ensure_secondary_indices() -> + ensure_secondary_indices(rabbit_tracked_connection, [vhost, username]), + ok. + +ensure_secondary_indices(Tab, Fields) -> + [mnesia:add_table_index(Tab, Field) || Field <- Fields]. + wait_for_replicated() -> wait([Tab || {Tab, TabDef} <- definitions(), not lists:member({local_content, true}, TabDef)]). @@ -297,9 +306,24 @@ definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}] - ++ gm:table_definitions() - ++ mirrored_supervisor:table_definitions(). + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, + + %% Used to track connections across virtual hosts + %% e.g. so that limits can be enforced. + %% + %% All data in this table is transient. + {rabbit_tracked_connection, + [{record_name, tracked_connection}, + {attributes, record_info(fields, tracked_connection)}, + {match, #tracked_connection{_ = '_'}}]}, + + {rabbit_tracked_connection_per_vhost, + [{record_name, tracked_connection_per_vhost}, + {attributes, record_info(fields, tracked_connection_per_vhost)}, + {match, #tracked_connection_per_vhost{_ = '_'}}]} + + ] ++ gm:table_definitions() + ++ mirrored_supervisor:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 3d624752ea..47053fafe7 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -55,6 +55,9 @@ -rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). +-rabbit_upgrade({vhost_limits, mnesia, []}). +-rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}). +-rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}). %% ------------------------------------------------------------------- @@ -88,9 +91,36 @@ -spec queue_state() -> 'ok'. -spec recoverable_slaves() -> 'ok'. -spec user_password_hashing() -> 'ok'. +-spec vhost_limits() -> 'ok'. +-spec tracked_connection() -> 'ok'. +-spec tracked_connection_per_vhost() -> 'ok'. + %%-------------------------------------------------------------------- +tracked_connection() -> + create(rabbit_tracked_connection, [{record_name, tracked_connection}, + {attributes, [id, node, vhost, name, + pid, protocol, + peer_host, peer_port, + username, connected_at]}]). + +tracked_connection_per_vhost() -> + create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost}, + {attributes, [vhost, connection_count]}]). + +%% replaces vhost.dummy (used to avoid having a single-field record +%% which Mnesia doesn't like) with vhost.limits (which is actually +%% used) +vhost_limits() -> + io:format("vhost_limits vhost_limits vhost_limits~n"), + transform( + rabbit_vhost, + fun ({vhost, VHost, _Dummy}) -> + {vhost, VHost, undefined} + end, + [virtual_host, limits]). + %% It's a bad idea to use records or record_info here, even for the %% destination form. Because in the future, the destination form of %% your current transform may not match the record any more, and it @@ -510,6 +540,11 @@ create(Tab, TabDef) -> {atomic, ok} = mnesia:create_table(Tab, TabDef), ok. +create(Tab, TabDef, SecondaryIndices) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + [mnesia:add_table_index(Tab, Idx) || Idx <- SecondaryIndices], + ok. + %% Dumb replacement for rabbit_exchange:declare that does not require %% the exchange type registry or worker pool to be running by dint of %% not validating anything and assuming the exchange type does not @@ -518,3 +553,7 @@ create(Tab, TabDef) -> declare_exchange(XName, Type) -> X = {exchange, XName, Type, true, false, false, []}, ok = mnesia:dirty_write(rabbit_durable_exchange, X). + +add_indices(Tab, FieldList) -> + [mnesia:add_table_index(Tab, Field) || Field <- FieldList], + ok. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index df2f8423b4..01f1046fb8 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -20,11 +20,14 @@ %%---------------------------------------------------------------------------- --export([add/1, delete/1, exists/1, list/0, with/2, assert/1]). +-export([add/1, delete/1, exists/1, list/0, with/2, assert/1, update/2, + set_limits/2, limits_of/1]). -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). + -spec add(rabbit_types:vhost()) -> 'ok'. -spec delete(rabbit_types:vhost()) -> 'ok'. +-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec exists(rabbit_types:vhost()) -> boolean(). -spec list() -> [rabbit_types:vhost()]. -spec with(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. @@ -138,6 +141,32 @@ assert(VHostPath) -> case exists(VHostPath) of false -> throw({error, {no_such_vhost, VHostPath}}) end. +update(VHostPath, Fun) -> + case mnesia:read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [V] -> + V1 = Fun(V), + ok = mnesia:write(rabbit_vhost, V1, write), + V1 + end. + +limits_of(VHostPath) when is_binary(VHostPath) -> + assert(VHostPath), + case mnesia:dirty_read({rabbit_vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [#vhost{limits = Limits}] -> + Limits + end; +limits_of(#vhost{virtual_host = Name}) -> + limits_of(Name). + +set_limits(VHost = #vhost{}, undefined) -> + VHost#vhost{limits = undefined}; +set_limits(VHost = #vhost{}, Limits) -> + VHost#vhost{limits = Limits}. + %%---------------------------------------------------------------------------- infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl new file mode 100644 index 0000000000..dd7ce51089 --- /dev/null +++ b/src/rabbit_vhost_limit.erl @@ -0,0 +1,98 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_limit). + +-behaviour(rabbit_runtime_parameter). + +-include("rabbit.hrl"). + +-export([register/0]). +-export([parse_set/2, clear/1]). +-export([validate/5, notify/4, notify_clear/3]). +-export([connection_limit/1]). + +-import(rabbit_misc, [pget/2]). + +-rabbit_boot_step({?MODULE, + [{description, "vhost limit parameters"}, + {mfa, {rabbit_vhost_limit, register, []}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +%%---------------------------------------------------------------------------- + +register() -> + rabbit_registry:register(runtime_parameter, <<"vhost-limits">>, ?MODULE). + +validate(_VHost, <<"vhost-limits">>, Name, Term, _User) -> + rabbit_parameter_validation:proplist( + Name, vhost_limit_validation(), Term). + +notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits) -> + rabbit_event:notify(vhost_limits_set, [{name, <<"limits">>} | Limits]), + update_vhost(VHost, Limits). + +notify_clear(VHost, <<"vhost-limits">>, <<"limits">>) -> + rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>}]), + update_vhost(VHost, undefined). + +connection_limit(VirtualHost) -> + get_limit(VirtualHost, <<"max-connections">>). + +%%---------------------------------------------------------------------------- + +parse_set(VHost, Defn) -> + case rabbit_misc:json_decode(Defn) of + {ok, JSON} -> + set(VHost, rabbit_misc:json_to_term(JSON)); + error -> + {error_string, "JSON decoding error"} + end. + +set(VHost, Defn) -> + rabbit_runtime_parameters:set_any(VHost, <<"vhost-limits">>, + <<"limits">>, Defn, none). + +clear(VHost) -> + rabbit_runtime_parameters:clear_any(VHost, <<"vhost-limits">>, + <<"limits">>). + +vhost_limit_validation() -> + [{<<"max-connections">>, fun rabbit_parameter_validation:number/2, mandatory}]. + +update_vhost(VHostName, Limits) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> + rabbit_vhost:update(VHostName, + fun(VHost) -> + rabbit_vhost:set_limits(VHost, Limits) + end) + end), + ok. + +get_limit(VirtualHost, Limit) -> + case rabbit_runtime_parameters:list(VirtualHost, <<"vhost-limits">>) of + [] -> undefined; + [Param] -> case pget(value, Param) of + undefined -> undefined; + Val -> case pget(Limit, Val) of + undefined -> undefined; + N when N =< 0 -> undefined; + N when N > 0 -> {ok, N} + end + end + end. diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl new file mode 100644 index 0000000000..0b1f5adf84 --- /dev/null +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -0,0 +1,660 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(per_vhost_connection_limit_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-import(rabbit_ct_client_helpers, [open_unmanaged_connection/2, + open_unmanaged_connection/3]). + + +all() -> + [ + {group, cluster_size_1}, + {group, cluster_size_2} + ]. + +groups() -> + [ + {cluster_size_1, [], [ + most_basic_single_node_connection_count_test, + single_node_single_vhost_connection_count_test, + single_node_multiple_vhost_connection_count_test, + single_node_list_in_vhost_test, + single_node_connection_reregistration_idempotency_test, + single_node_single_vhost_limit_test, + single_node_multiple_vhost_limit_test, + single_node_vhost_deletion_forces_connection_closure_test + ]}, + {cluster_size_2, [], [ + most_basic_cluster_connection_count_test, + cluster_single_vhost_connection_count_test, + cluster_multiple_vhost_connection_count_test, + cluster_node_restart_connection_count_test, + cluster_node_list_on_node_test, + cluster_connection_reregistration_idempotency_test, + cluster_single_vhost_limit_test + ]} + ]. + +%% see partitions_SUITE +-define(DELAY, 9000). + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_1, Config) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, 1}, + {rmq_nodename_suffix, Suffix} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); +init_per_group(cluster_size_2, Config) -> + init_per_multinode_group(cluster_size_2, Config, 2); +init_per_group(partition_handling, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]), + init_per_multinode_group(partition_handling, Config1, 3). + +init_per_multinode_group(_GroupName, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:setup_steps(), + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:teardown_steps(), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- + +most_basic_single_node_connection_count_test(Config) -> + VHost = <<"/">>, + ?assertEqual(0, count_connections_in(Config, VHost)), + Conn = open_unmanaged_connection(Config, 0), + ?assertEqual(1, count_connections_in(Config, VHost)), + rabbit_ct_client_helpers:close_connection(Conn), + ?assertEqual(0, count_connections_in(Config, VHost)), + + passed. + +single_node_single_vhost_connection_count_test(Config) -> + VHost = <<"/">>, + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0), + ?assertEqual(1, count_connections_in(Config, VHost)), + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn2 = open_unmanaged_connection(Config, 0), + ?assertEqual(1, count_connections_in(Config, VHost)), + + Conn3 = open_unmanaged_connection(Config, 0), + ?assertEqual(2, count_connections_in(Config, VHost)), + + Conn4 = open_unmanaged_connection(Config, 0), + ?assertEqual(3, count_connections_in(Config, VHost)), + + (catch exit(Conn4, please_terminate)), + ?assertEqual(2, count_connections_in(Config, VHost)), + + Conn5 = open_unmanaged_connection(Config, 0), + ?assertEqual(3, count_connections_in(Config, VHost)), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn2, Conn3, Conn5]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + passed. + +single_node_multiple_vhost_connection_count_test(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + ?assertEqual(1, count_connections_in(Config, VHost1)), + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + Conn2 = open_unmanaged_connection(Config, 0, VHost2), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + Conn3 = open_unmanaged_connection(Config, 0, VHost1), + ?assertEqual(1, count_connections_in(Config, VHost1)), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + Conn4 = open_unmanaged_connection(Config, 0, VHost1), + ?assertEqual(2, count_connections_in(Config, VHost1)), + + (catch exit(Conn4, please_terminate)), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + Conn5 = open_unmanaged_connection(Config, 0, VHost2), + ?assertEqual(2, count_connections_in(Config, VHost2)), + + Conn6 = open_unmanaged_connection(Config, 0, VHost2), + ?assertEqual(3, count_connections_in(Config, VHost2)), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn2, Conn3, Conn5, Conn6]), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + + passed. + +single_node_list_in_vhost_test(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, length(connections_in(Config, VHost1))), + ?assertEqual(0, length(connections_in(Config, VHost2))), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1), + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, length(connections_in(Config, VHost1))), + + Conn2 = open_unmanaged_connection(Config, 0, VHost2), + [#tracked_connection{vhost = VHost2}] = connections_in(Config, VHost2), + + Conn3 = open_unmanaged_connection(Config, 0, VHost1), + [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1), + + Conn4 = open_unmanaged_connection(Config, 0, VHost1), + (catch exit(Conn4, please_terminate)), + [#tracked_connection{vhost = VHost1}] = connections_in(Config, VHost1), + + Conn5 = open_unmanaged_connection(Config, 0, VHost2), + Conn6 = open_unmanaged_connection(Config, 0, VHost2), + [<<"vhost1">>, <<"vhost2">>] = + lists:usort(lists:map(fun (#tracked_connection{vhost = V}) -> V end, + all_connections(Config))), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn2, Conn3, Conn5, Conn6]), + + ?assertEqual(0, length(all_connections(Config))), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + + passed. + +most_basic_cluster_connection_count_test(Config) -> + VHost = <<"/">>, + ?assertEqual(0, count_connections_in(Config, VHost)), + Conn1 = open_unmanaged_connection(Config, 0), + ?assertEqual(1, count_connections_in(Config, VHost)), + + Conn2 = open_unmanaged_connection(Config, 1), + ?assertEqual(2, count_connections_in(Config, VHost)), + + Conn3 = open_unmanaged_connection(Config, 1), + ?assertEqual(3, count_connections_in(Config, VHost)), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn1, Conn2, Conn3]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + passed. + +cluster_single_vhost_connection_count_test(Config) -> + VHost = <<"/">>, + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0), + ?assertEqual(1, count_connections_in(Config, VHost)), + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn2 = open_unmanaged_connection(Config, 1), + ?assertEqual(1, count_connections_in(Config, VHost)), + + Conn3 = open_unmanaged_connection(Config, 0), + ?assertEqual(2, count_connections_in(Config, VHost)), + + Conn4 = open_unmanaged_connection(Config, 1), + ?assertEqual(3, count_connections_in(Config, VHost)), + + (catch exit(Conn4, please_terminate)), + ?assertEqual(2, count_connections_in(Config, VHost)), + + Conn5 = open_unmanaged_connection(Config, 1), + ?assertEqual(3, count_connections_in(Config, VHost)), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn2, Conn3, Conn5]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + passed. + +cluster_multiple_vhost_connection_count_test(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + ?assertEqual(1, count_connections_in(Config, VHost1)), + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + Conn2 = open_unmanaged_connection(Config, 1, VHost2), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + Conn3 = open_unmanaged_connection(Config, 1, VHost1), + ?assertEqual(1, count_connections_in(Config, VHost1)), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + Conn4 = open_unmanaged_connection(Config, 0, VHost1), + ?assertEqual(2, count_connections_in(Config, VHost1)), + + (catch exit(Conn4, please_terminate)), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + Conn5 = open_unmanaged_connection(Config, 1, VHost2), + ?assertEqual(2, count_connections_in(Config, VHost2)), + + Conn6 = open_unmanaged_connection(Config, 0, VHost2), + ?assertEqual(3, count_connections_in(Config, VHost2)), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn2, Conn3, Conn5, Conn6]), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + + passed. + +cluster_node_restart_connection_count_test(Config) -> + VHost = <<"/">>, + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0), + ?assertEqual(1, count_connections_in(Config, VHost)), + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn2 = open_unmanaged_connection(Config, 1), + ?assertEqual(1, count_connections_in(Config, VHost)), + + Conn3 = open_unmanaged_connection(Config, 0), + ?assertEqual(2, count_connections_in(Config, VHost)), + + Conn4 = open_unmanaged_connection(Config, 1), + ?assertEqual(3, count_connections_in(Config, VHost)), + + Conn5 = open_unmanaged_connection(Config, 1), + ?assertEqual(4, count_connections_in(Config, VHost)), + + rabbit_ct_broker_helpers:restart_broker(Config, 1), + ?assertEqual(1, count_connections_in(Config, VHost)), + + lists:foreach(fun (C) -> + (catch rabbit_ct_client_helpers:close_connection(C)) + end, [Conn2, Conn3, Conn4, Conn5]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + passed. + +cluster_node_list_on_node_test(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + ?assertEqual(0, length(all_connections(Config))), + ?assertEqual(0, length(connections_on_node(Config, 0))), + + Conn1 = open_unmanaged_connection(Config, 0), + [#tracked_connection{node = A}] = connections_on_node(Config, 0), + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, length(connections_on_node(Config, 0))), + + _Conn2 = open_unmanaged_connection(Config, 1), + [#tracked_connection{node = B}] = connections_on_node(Config, 1), + + Conn3 = open_unmanaged_connection(Config, 0), + ?assertEqual(1, length(connections_on_node(Config, 0))), + + Conn4 = open_unmanaged_connection(Config, 1), + ?assertEqual(2, length(connections_on_node(Config, 1))), + + (catch exit(Conn4, please_terminate)), + ?assertEqual(1, length(connections_on_node(Config, 1))), + + Conn5 = open_unmanaged_connection(Config, 0), + ?assertEqual(2, length(connections_on_node(Config, 0))), + + rabbit_ct_broker_helpers:stop_broker(Config, 1), + ?assertEqual(2, length(all_connections(Config))), + ?assertEqual(0, length(connections_on_node(Config, 0, B))), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn3, Conn5]), + + timer:sleep(100), + ?assertEqual(0, length(all_connections(Config, 0))), + + rabbit_ct_broker_helpers:start_broker(Config, 1), + + passed. + +single_node_connection_reregistration_idempotency_test(Config) -> + VHost = <<"/">>, + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0), + Conn2 = open_unmanaged_connection(Config, 0), + Conn3 = open_unmanaged_connection(Config, 0), + Conn4 = open_unmanaged_connection(Config, 0), + Conn5 = open_unmanaged_connection(Config, 0), + + ?assertEqual(5, count_connections_in(Config, VHost)), + + reregister_connections_on(Config, 0), + timer:sleep(100), + + ?assertEqual(5, count_connections_in(Config, VHost)), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn1, Conn2, Conn3, Conn4, Conn5]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + passed. + +cluster_connection_reregistration_idempotency_test(Config) -> + VHost = <<"/">>, + + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0), + Conn2 = open_unmanaged_connection(Config, 1), + Conn3 = open_unmanaged_connection(Config, 0), + Conn4 = open_unmanaged_connection(Config, 1), + Conn5 = open_unmanaged_connection(Config, 1), + + ?assertEqual(5, count_connections_in(Config, VHost)), + + reregister_connections_on(Config, 0), + reregister_connections_on(Config, 1), + timer:sleep(100), + + ?assertEqual(5, count_connections_in(Config, VHost)), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn1, Conn2, Conn3, Conn4, Conn5]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + passed. + +single_node_single_vhost_limit_test(Config) -> + VHost = <<"/">>, + set_vhost_connection_limit(Config, VHost, 3), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0), + Conn2 = open_unmanaged_connection(Config, 0), + Conn3 = open_unmanaged_connection(Config, 0), + + %% we've crossed the limit + {error, not_allowed} = open_unmanaged_connection(Config, 0), + {error, not_allowed} = open_unmanaged_connection(Config, 0), + {error, not_allowed} = open_unmanaged_connection(Config, 0), + + set_vhost_connection_limit(Config, VHost, 5), + Conn4 = open_unmanaged_connection(Config, 0), + Conn5 = open_unmanaged_connection(Config, 0), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn1, Conn2, Conn3, Conn4, Conn5]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + set_vhost_connection_limit(Config, VHost, 0), + + passed. + +single_node_multiple_vhost_limit_test(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + set_vhost_connection_limit(Config, VHost1, 2), + set_vhost_connection_limit(Config, VHost2, 2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + Conn2 = open_unmanaged_connection(Config, 0, VHost1), + Conn3 = open_unmanaged_connection(Config, 0, VHost2), + Conn4 = open_unmanaged_connection(Config, 0, VHost2), + + %% we've crossed the limit + {error, not_allowed} = open_unmanaged_connection(Config, 0, VHost1), + {error, not_allowed} = open_unmanaged_connection(Config, 0, VHost2), + + Conn5 = open_unmanaged_connection(Config, 0), + + set_vhost_connection_limit(Config, VHost1, 5), + set_vhost_connection_limit(Config, VHost2, 5), + + Conn6 = open_unmanaged_connection(Config, 0, VHost1), + Conn7 = open_unmanaged_connection(Config, 0, VHost1), + Conn8 = open_unmanaged_connection(Config, 0, VHost1), + Conn9 = open_unmanaged_connection(Config, 0, VHost2), + Conn10 = open_unmanaged_connection(Config, 0, VHost2), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn1, Conn2, Conn3, Conn4, Conn5, + Conn6, Conn7, Conn8, Conn9, Conn10]), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + set_vhost_connection_limit(Config, VHost1, 100000000), + set_vhost_connection_limit(Config, VHost2, 100000000), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + + passed. + +cluster_single_vhost_limit_test(Config) -> + VHost = <<"/">>, + set_vhost_connection_limit(Config, VHost, 2), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost), + Conn2 = open_unmanaged_connection(Config, 1, VHost), + + %% we've crossed the limit + {error, not_allowed} = open_unmanaged_connection(Config, 0, VHost), + {error, not_allowed} = open_unmanaged_connection(Config, 1, VHost), + + set_vhost_connection_limit(Config, VHost, 5), + + Conn3 = open_unmanaged_connection(Config, 0, VHost), + Conn4 = open_unmanaged_connection(Config, 0, VHost), + + lists:foreach(fun (C) -> + rabbit_ct_client_helpers:close_connection(C) + end, [Conn1, Conn2, Conn3, Conn4]), + + ?assertEqual(0, count_connections_in(Config, VHost)), + + set_vhost_connection_limit(Config, VHost, 0), + + passed. + +single_node_vhost_deletion_forces_connection_closure_test(Config) -> + VHost1 = <<"vhost1">>, + VHost2 = <<"vhost2">>, + + set_up_vhost(Config, VHost1), + set_up_vhost(Config, VHost2), + + ?assertEqual(0, count_connections_in(Config, VHost1)), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + Conn1 = open_unmanaged_connection(Config, 0, VHost1), + ?assertEqual(1, count_connections_in(Config, VHost1)), + + _Conn2 = open_unmanaged_connection(Config, 0, VHost2), + ?assertEqual(1, count_connections_in(Config, VHost2)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, VHost2)), + + rabbit_ct_client_helpers:close_connection(Conn1), + ?assertEqual(0, count_connections_in(Config, VHost1)), + + rabbit_ct_broker_helpers:delete_vhost(Config, VHost1), + + passed. + + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +count_connections_in(Config, VHost) -> + count_connections_in(Config, VHost, 0). +count_connections_in(Config, VHost, NodeIndex) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + count_connections_in, [VHost]). + +connections_in(Config, VHost) -> + connections_in(Config, 0, VHost). +connections_in(Config, NodeIndex, VHost) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list, [VHost]). + +connections_on_node(Config) -> + connections_on_node(Config, 0). +connections_on_node(Config, NodeIndex) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename), + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list_on_node, [Node]). +connections_on_node(Config, NodeIndex, NodeForListing) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list_on_node, [NodeForListing]). + +all_connections(Config) -> + all_connections(Config, 0). +all_connections(Config, NodeIndex) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list, []). + +reregister_connections_on(Config, NodeIndex) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracker, + reregister, + [Node]). + +set_up_vhost(Config, VHost) -> + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost). + +set_vhost_connection_limit(Config, VHost, Count) -> + set_vhost_connection_limit(Config, 0, VHost, Count). + +set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + rabbit_ct_broker_helpers:control_action( + set_vhost_limits, Node, + ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], + [{"-p", binary_to_list(VHost)}]). diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl new file mode 100644 index 0000000000..6ef6a25496 --- /dev/null +++ b/test/per_vhost_connection_limit_partitions_SUITE.erl @@ -0,0 +1,164 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved. +%% + +-module(per_vhost_connection_limit_partitions_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-import(rabbit_ct_client_helpers, [open_unmanaged_connection/2, + open_unmanaged_connection/3]). + + +all() -> + [ + {group, net_ticktime_1} + ]. + +groups() -> + [ + {net_ticktime_1, [], [ + cluster_full_partition_with_autoheal_test + ]} + ]. + +%% see partitions_SUITE +-define(DELAY, 12000). + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(net_ticktime_1 = GroupName, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]), + init_per_multinode_group(GroupName, Config1, 3). + +init_per_multinode_group(_GroupName, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix}, + {rmq_nodes_clustered, false} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ [ + fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1 + ]). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:setup_steps(), + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_client_helpers:teardown_steps(), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- + +cluster_full_partition_with_autoheal_test(Config) -> + VHost = <<"/">>, + rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal), + + ?assertEqual(0, count_connections_in(Config, VHost)), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% 6 connections, 2 per node + Conn1 = open_unmanaged_connection(Config, A), + Conn2 = open_unmanaged_connection(Config, A), + Conn3 = open_unmanaged_connection(Config, B), + Conn4 = open_unmanaged_connection(Config, B), + Conn5 = open_unmanaged_connection(Config, C), + Conn6 = open_unmanaged_connection(Config, C), + ?assertEqual(6, count_connections_in(Config, VHost)), + + %% B drops off the network, non-reachable by either A or C + rabbit_ct_broker_helpers:block_traffic_between(A, B), + rabbit_ct_broker_helpers:block_traffic_between(B, C), + timer:sleep(?DELAY), + + %% A and C are still connected, so 4 connections are tracked + ?assertEqual(4, count_connections_in(Config, VHost)), + + rabbit_ct_broker_helpers:allow_traffic_between(A, B), + rabbit_ct_broker_helpers:allow_traffic_between(B, C), + timer:sleep(?DELAY), + + %% during autoheal B's connections were dropped + ?assertEqual(4, count_connections_in(Config, VHost)), + + lists:foreach(fun (Conn) -> + (catch rabbit_ct_client_helpers:close_connection(Conn)) + end, [Conn1, Conn2, Conn3, Conn4, + Conn5, Conn6]), + + passed. + + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +count_connections_in(Config, VHost) -> + count_connections_in(Config, VHost, 0). +count_connections_in(Config, VHost, NodeIndex) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + count_connections_in, [VHost]). + +connections_in(Config, VHost) -> + connections_in(Config, 0, VHost). +connections_in(Config, NodeIndex, VHost) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list, [VHost]). + +connections_on_node(Config) -> + connections_on_node(Config, 0). +connections_on_node(Config, NodeIndex) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename), + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list_on_node, [Node]). +connections_on_node(Config, NodeIndex, NodeForListing) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list_on_node, [NodeForListing]). + +all_connections(Config) -> + all_connections(Config, 0). +all_connections(Config, NodeIndex) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_connection_tracking, + list, []). |
