summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-08-02 06:06:09 -0700
committerMichael Klishin <mklishin@pivotal.io>2016-08-02 06:06:09 -0700
commit747b41a9d3794706390b21003eb4bf6929422213 (patch)
tree0dac98f38ce913e31e06fc7deccf0fa69951acaf
parent995c430fbb1981d81f8c18740153c611c49fda4d (diff)
downloadrabbitmq-server-git-747b41a9d3794706390b21003eb4bf6929422213.tar.gz
Switch to a single table per node for connection tracking
This way we have a single writer, multiple readers and no lost counter update due to natural race conditions. Data for tables from available nodes are then aggregated. Two open questions: * Whether to use a parallel version of lists:map/2 * When to clean up the tables
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_connection_tracker.erl99
-rw-r--r--src/rabbit_connection_tracking.erl186
-rw-r--r--src/rabbit_connection_tracking_handler.erl38
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_node_monitor.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--test/per_vhost_connection_limit_SUITE.erl115
8 files changed, 215 insertions, 250 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 59ede3c802..5db2c40b66 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -178,9 +178,9 @@
{mfa, {rabbit_direct, boot, []}},
{requires, log_relay}]}).
--rabbit_boot_step({connection_tracker,
- [{description, "helps track node-local connections"},
- {mfa, {rabbit_connection_tracker, boot, []}},
+-rabbit_boot_step({connection_tracking,
+ [{description, "sets up internal storage for node-local connections"},
+ {mfa, {rabbit_connection_tracking, boot, []}},
{requires, log_relay}]}).
-rabbit_boot_step({networking,
diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl
deleted file mode 100644
index 0177627d83..0000000000
--- a/src/rabbit_connection_tracker.erl
+++ /dev/null
@@ -1,99 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(rabbit_connection_tracker).
-
-%% Abstracts away how tracked connection records are stored
-%% and queried.
-%%
-%% See also:
-%%
-%% * rabbit_connection_tracking_handler
-%% * rabbit_reader
-%% * rabbit_event
-
--behaviour(gen_server2).
-
-%% API
--export([boot/0, start_link/0, reregister/1]).
-
-%% gen_fsm callbacks
--export([init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3]).
-
--define(SERVER, ?MODULE).
-
-
-%%%===================================================================
-%%% API
-%%%===================================================================
-
-boot() ->
- {ok, _} = start_link(),
- ok.
-
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-reregister(Node) ->
- rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]),
- gen_server2:cast({?SERVER, Node}, reregister).
-
-%%%===================================================================
-%%% gen_server callbacks
-%%%===================================================================
-
-init([]) ->
- {ok, {}}.
-
-handle_call(_Req, _From, State) ->
- {noreply, State}.
-
-handle_cast(reregister, State) ->
- Cs = rabbit_networking:connections_local(),
- rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]),
- case Cs of
- [] -> ok;
- Cs ->
- [reregister_connection(C) || C <- Cs],
- ok
- end,
- rabbit_log:info("Done re-registering client connections"),
- {noreply, State}.
-
-handle_info(_Req, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
-
-reregister_connection(Conn) ->
- try
- Conn ! reregister
- catch _:Error ->
- rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error])
- end.
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index f059c2698b..17e2d79034 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -25,34 +25,117 @@
%% * rabbit_reader
%% * rabbit_event
--export([register_connection/1, unregister_connection/1,
+-export([boot/0,
+ ensure_tracked_connections_table_for_node/1,
+ ensure_per_vhost_tracked_connections_table_for_node/1,
+ ensure_tracked_connections_table_for_this_node/0,
+ ensure_per_vhost_tracked_connections_table_for_this_node/0,
+ tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1,
+ clear_tracked_connections_table_for_this_node/0,
+ register_connection/1, unregister_connection/1,
list/0, list/1, list_on_node/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
- is_over_connection_limit/1, count_connections_in/1,
- on_node_down/1, on_node_up/1]).
+ is_over_connection_limit/1, count_connections_in/1]).
-include_lib("rabbit.hrl").
--define(TABLE, rabbit_tracked_connection).
--define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost).
--define(SERVER, ?MODULE).
-
%%
%% API
%%
+-spec boot() -> ok.
+
+%% Sets up and resets connection tracking tables for this
+%% node.
+boot() ->
+ ensure_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Created a table for connection tracking on this node: ~p",
+ [tracked_connection_table_name_for(node())]),
+ ensure_per_vhost_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Created a table for per-vhost connection counting on this node: ~p",
+ [tracked_connection_per_vhost_table_name_for(node())]),
+ clear_tracked_connections_table_for_this_node(),
+ ok.
+
+
+-spec ensure_tracked_connections_table_for_this_node() -> ok.
+
+ensure_tracked_connections_table_for_this_node() ->
+ ensure_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_per_vhost_tracked_connections_table_for_this_node() -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_this_node() ->
+ ensure_per_vhost_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection},
+ {attributes, [id, node, vhost, name,
+ pid, protocol,
+ peer_host, peer_port,
+ username, connected_at]}]) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ %% TODO: propagate errors
+ end.
+
+
+-spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_vhost_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost},
+ {attributes, [vhost, connection_count]}]) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ %% TODO: propagate errors
+ end.
+
+
+-spec clear_tracked_connections_table_for_this_node() -> ok.
+
+clear_tracked_connections_table_for_this_node() ->
+ case mnesia:clear_table(tracked_connection_table_name_for(node())) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end,
+ case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end.
+
+
+-spec tracked_connection_table_name_for(node()) -> atom().
+
+tracked_connection_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_on_node_~s", [Node])).
+
+-spec tracked_connection_per_vhost_table_name_for(node()) -> atom().
+
+tracked_connection_per_vhost_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])).
+
+
-spec register_connection(rabbit_types:tracked_connection()) -> ok.
-register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) ->
+register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
rabbit_misc:execute_mnesia_transaction(
fun() ->
%% upsert
- case mnesia:dirty_read(?TABLE, ConnId) of
+ case mnesia:dirty_read(TableName, ConnId) of
[] ->
- mnesia:write(?TABLE, Conn, write),
+ %% TODO: counter table
+ mnesia:write(TableName, Conn, write),
mnesia:dirty_update_counter(
- ?PER_VHOST_COUNTER_TABLE, VHost, 1);
+ PerVhostTableName, VHost, 1);
[_Row] ->
ok
end,
@@ -61,16 +144,17 @@ register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) ->
-spec unregister_connection(rabbit_types:connection_name()) -> ok.
-unregister_connection(ConnId = {_Node, _Name}) ->
+unregister_connection(ConnId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
rabbit_misc:execute_mnesia_transaction(
fun() ->
- case mnesia:dirty_read(?TABLE, ConnId) of
- [] -> ok;
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] -> ok;
[Row] ->
mnesia:dirty_update_counter(
- ?PER_VHOST_COUNTER_TABLE,
- Row#tracked_connection.vhost, -1),
- mnesia:delete({?TABLE, ConnId})
+ PerVhostTableName, Row#tracked_connection.vhost, -1),
+ mnesia:delete({TableName, ConnId})
end
end).
@@ -78,40 +162,33 @@ unregister_connection(ConnId = {_Node, _Name}) ->
-spec list() -> [rabbit_types:tracked_connection()].
list() ->
- mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}).
+ Chunks = lists:map(
+ fun (Node) ->
+ Tab = tracked_connection_table_name_for(Node),
+ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
+ end, rabbit_mnesia:cluster_nodes(running)),
+ lists:foldl(fun(Chunk, Acc) -> Acc ++ Chunk end, [], Chunks).
-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
list(VHost) ->
- mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}).
+ Chunks = lists:map(
+ fun (Node) ->
+ Tab = tracked_connection_table_name_for(Node),
+ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'})
+ end, rabbit_mnesia:cluster_nodes(running)),
+ lists:foldl(fun(Chunk, Acc) -> Acc ++ Chunk end, [], Chunks).
-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
list_on_node(Node) ->
- mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}).
-
-
--spec on_node_down(node()) -> ok.
-
-on_node_down(Node) ->
- case lists:member(Node, nodes()) of
- false ->
- Cs = list_on_node(Node),
- rabbit_log:info(
- "Node ~p is down, unregistering ~p client connections~n",
- [Node, length(Cs)]),
- [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs],
- ok;
- true -> rabbit_log:info(
- "Keeping ~s connections: the node is already back~n", [Node])
- end.
-
--spec on_node_up(node()) -> ok.
-on_node_up(Node) ->
- rabbit_connection_tracker:reregister(Node),
- ok.
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{_ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
@@ -133,19 +210,24 @@ is_over_connection_limit(VirtualHost) ->
count_connections_in(VirtualHost) ->
try
- case mnesia:transaction(
+ Ns = lists:map(
+ fun (Node) ->
+ Tab = tracked_connection_per_vhost_table_name_for(Node),
+ try
+ case mnesia:transaction(
fun() ->
- case mnesia:dirty_read(
- {?PER_VHOST_COUNTER_TABLE,
- VirtualHost}) of
- [] -> 0;
- [Val] ->
- Val#tracked_connection_per_vhost.connection_count
- end
+ case mnesia:dirty_read({Tab, VirtualHost}) of
+ [] -> 0;
+ [Val] -> Val#tracked_connection_per_vhost.connection_count
+ end
end) of
- {atomic, Val} -> Val;
- {aborted, _Reason} -> 0
- end
+ {atomic, Val} -> Val;
+ {aborted, _Reason} -> 0
+ end
+ catch _ -> 0
+ end
+ end, rabbit_mnesia:cluster_nodes(running)),
+ lists:foldl(fun(X, Acc) -> Acc + X end, 0, Ns)
catch
_:Err ->
rabbit_log:error(
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index deaf9bc7f6..91b38d2c39 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -53,23 +53,31 @@ init([]) ->
{ok, []}.
handle_event(#event{type = connection_created, props = Details}, State) ->
- rabbit_connection_tracking:register_connection(
- rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
- ),
- {ok, State};
-%% see rabbit_reader
-handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) ->
- rabbit_connection_tracking:register_connection(
- rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState)
- ),
+ ThisNode = node(),
+ case proplists:get_value(node, Details) of
+ ThisNode ->
+ rabbit_connection_tracking:register_connection(
+ rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
+ );
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
{ok, State};
handle_event(#event{type = connection_closed, props = Details}, State) ->
- %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
- %% {pid,<0.1774.0>},
- %% {node, rabbit@hostname}]
- rabbit_connection_tracking:unregister_connection(
- {proplists:get_value(node, Details),
- proplists:get_value(name, Details)}),
+ ThisNode = node(),
+ case proplists:get_value(node, Details) of
+ ThisNode ->
+ %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
+ %% {pid,<0.1774.0>},
+ %% {node, rabbit@hostname}]
+ rabbit_connection_tracking:unregister_connection(
+ {proplists:get_value(node, Details),
+ proplists:get_value(name, Details)});
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
{ok, State};
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
VHost = proplists:get_value(name, Details),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 43d268c374..88ea3a80f5 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -148,6 +148,7 @@ auto_cluster(TryNodes, NodeType) ->
rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true),
+ rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning(
@@ -194,6 +195,7 @@ join_cluster(DiscoveryNode, NodeType) ->
[ClusterNodes, NodeType]),
ok = init_db_with_mnesia(ClusterNodes, NodeType,
true, true),
+ rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster(),
ok;
{error, Reason} ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 9ed790c75d..0322aacfd1 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -732,7 +732,6 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node),
- ok = rabbit_connection_tracking:on_node_down(Node),
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -761,8 +760,7 @@ ensure_keepalive_timer(State) ->
handle_live_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node),
- ok = rabbit_connection_tracking:on_node_up(Node).
+ ok = rabbit_mnesia:on_node_up(Node).
maybe_autoheal(State = #state{partitions = []}) ->
State;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 47053fafe7..7efa36d637 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -56,8 +56,6 @@
-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
-rabbit_upgrade({vhost_limits, mnesia, []}).
--rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}).
--rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}).
%% -------------------------------------------------------------------
@@ -92,23 +90,10 @@
-spec recoverable_slaves() -> 'ok'.
-spec user_password_hashing() -> 'ok'.
-spec vhost_limits() -> 'ok'.
--spec tracked_connection() -> 'ok'.
--spec tracked_connection_per_vhost() -> 'ok'.
%%--------------------------------------------------------------------
-tracked_connection() ->
- create(rabbit_tracked_connection, [{record_name, tracked_connection},
- {attributes, [id, node, vhost, name,
- pid, protocol,
- peer_host, peer_port,
- username, connected_at]}]).
-
-tracked_connection_per_vhost() ->
- create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost},
- {attributes, [vhost, connection_count]}]).
-
%% replaces vhost.dummy (used to avoid having a single-field record
%% which Mnesia doesn't like) with vhost.limits (which is actually
%% used)
diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl
index 0b1f5adf84..6b9fb3ec29 100644
--- a/test/per_vhost_connection_limit_SUITE.erl
+++ b/test/per_vhost_connection_limit_SUITE.erl
@@ -39,7 +39,6 @@ groups() ->
single_node_single_vhost_connection_count_test,
single_node_multiple_vhost_connection_count_test,
single_node_list_in_vhost_test,
- single_node_connection_reregistration_idempotency_test,
single_node_single_vhost_limit_test,
single_node_multiple_vhost_limit_test,
single_node_vhost_deletion_forces_connection_closure_test
@@ -50,8 +49,8 @@ groups() ->
cluster_multiple_vhost_connection_count_test,
cluster_node_restart_connection_count_test,
cluster_node_list_on_node_test,
- cluster_connection_reregistration_idempotency_test,
- cluster_single_vhost_limit_test
+ cluster_single_vhost_limit_test,
+ cluster_single_vhost_limit2_test
]}
].
@@ -102,13 +101,22 @@ end_per_group(_Group, Config) ->
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
rabbit_ct_client_helpers:setup_steps(),
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
rabbit_ct_client_helpers:teardown_steps(),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
+clear_all_connection_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_connection_tracking,
+ clear_tracked_connections_table_for_this_node,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
%% -------------------------------------------------------------------
%% Test cases.
%% -------------------------------------------------------------------
@@ -400,6 +408,8 @@ cluster_node_list_on_node_test(Config) ->
?assertEqual(2, length(connections_on_node(Config, 0))),
rabbit_ct_broker_helpers:stop_broker(Config, 1),
+ await_running_node_refresh(Config, 0),
+
?assertEqual(2, length(all_connections(Config))),
?assertEqual(0, length(connections_on_node(Config, 0, B))),
@@ -414,58 +424,6 @@ cluster_node_list_on_node_test(Config) ->
passed.
-single_node_connection_reregistration_idempotency_test(Config) ->
- VHost = <<"/">>,
- ?assertEqual(0, count_connections_in(Config, VHost)),
-
- Conn1 = open_unmanaged_connection(Config, 0),
- Conn2 = open_unmanaged_connection(Config, 0),
- Conn3 = open_unmanaged_connection(Config, 0),
- Conn4 = open_unmanaged_connection(Config, 0),
- Conn5 = open_unmanaged_connection(Config, 0),
-
- ?assertEqual(5, count_connections_in(Config, VHost)),
-
- reregister_connections_on(Config, 0),
- timer:sleep(100),
-
- ?assertEqual(5, count_connections_in(Config, VHost)),
-
- lists:foreach(fun (C) ->
- rabbit_ct_client_helpers:close_connection(C)
- end, [Conn1, Conn2, Conn3, Conn4, Conn5]),
-
- ?assertEqual(0, count_connections_in(Config, VHost)),
-
- passed.
-
-cluster_connection_reregistration_idempotency_test(Config) ->
- VHost = <<"/">>,
-
- ?assertEqual(0, count_connections_in(Config, VHost)),
-
- Conn1 = open_unmanaged_connection(Config, 0),
- Conn2 = open_unmanaged_connection(Config, 1),
- Conn3 = open_unmanaged_connection(Config, 0),
- Conn4 = open_unmanaged_connection(Config, 1),
- Conn5 = open_unmanaged_connection(Config, 1),
-
- ?assertEqual(5, count_connections_in(Config, VHost)),
-
- reregister_connections_on(Config, 0),
- reregister_connections_on(Config, 1),
- timer:sleep(100),
-
- ?assertEqual(5, count_connections_in(Config, VHost)),
-
- lists:foreach(fun (C) ->
- rabbit_ct_client_helpers:close_connection(C)
- end, [Conn1, Conn2, Conn3, Conn4, Conn5]),
-
- ?assertEqual(0, count_connections_in(Config, VHost)),
-
- passed.
-
single_node_single_vhost_limit_test(Config) ->
VHost = <<"/">>,
set_vhost_connection_limit(Config, VHost, 3),
@@ -549,8 +507,11 @@ cluster_single_vhost_limit_test(Config) ->
?assertEqual(0, count_connections_in(Config, VHost)),
+ %% here connections are opened to different nodes
Conn1 = open_unmanaged_connection(Config, 0, VHost),
Conn2 = open_unmanaged_connection(Config, 1, VHost),
+ %% give tracked connection rows some time to propagate in both directions
+ timer:sleep(200),
%% we've crossed the limit
{error, not_allowed} = open_unmanaged_connection(Config, 0, VHost),
@@ -571,6 +532,39 @@ cluster_single_vhost_limit_test(Config) ->
passed.
+cluster_single_vhost_limit2_test(Config) ->
+ VHost = <<"/">>,
+ set_vhost_connection_limit(Config, VHost, 2),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ %% here a limit is reached on one node first
+ Conn1 = open_unmanaged_connection(Config, 0, VHost),
+ Conn2 = open_unmanaged_connection(Config, 0, VHost),
+
+ %% we've crossed the limit
+ {error, not_allowed} = open_unmanaged_connection(Config, 0, VHost),
+
+ timer:sleep(200),
+ {error, not_allowed} = open_unmanaged_connection(Config, 1, VHost),
+
+ set_vhost_connection_limit(Config, VHost, 5),
+
+ Conn3 = open_unmanaged_connection(Config, 1, VHost),
+ Conn4 = open_unmanaged_connection(Config, 1, VHost),
+ Conn5 = open_unmanaged_connection(Config, 1, VHost),
+ {error, not_allowed} = open_unmanaged_connection(Config, 1, VHost),
+
+ lists:foreach(fun (C) ->
+ rabbit_ct_client_helpers:close_connection(C)
+ end, [Conn1, Conn2, Conn3, Conn4, Conn5]),
+
+ ?assertEqual(0, count_connections_in(Config, VHost)),
+
+ set_vhost_connection_limit(Config, VHost, 0),
+
+ passed.
+
single_node_vhost_deletion_forces_connection_closure_test(Config) ->
VHost1 = <<"vhost1">>,
VHost2 = <<"vhost2">>,
@@ -636,14 +630,6 @@ all_connections(Config, NodeIndex) ->
rabbit_connection_tracking,
list, []).
-reregister_connections_on(Config, NodeIndex) ->
- Node = rabbit_ct_broker_helpers:get_node_config(
- Config, NodeIndex, nodename),
- rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
- rabbit_connection_tracker,
- reregister,
- [Node]).
-
set_up_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:add_vhost(Config, VHost),
rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost).
@@ -658,3 +644,6 @@ set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
set_vhost_limits, Node,
["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
[{"-p", binary_to_list(VHost)}]).
+
+await_running_node_refresh(Config, NodeIndex) ->
+ timer:sleep(250).