summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda-D <ayanda.dube@erlang-solutions.com>2020-06-10 10:10:48 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-09-02 04:28:58 +0300
commit3ea173725611662a666c95c336c3f3d0d34ff74a (patch)
treee7510513f46580965a1e5365efcea456a8d2555f
parent275cecce2f9f429a77190712ad3f45afcbc31459 (diff)
downloadrabbitmq-server-git-3ea173725611662a666c95c336c3f3d0d34ff74a.tar.gz
Introduce per-user channel tracking
Make 'tracking_execution_timeout' configurable Add per_user_connection_channel_tracking_SUITE
-rw-r--r--Makefile4
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_channel_tracking.erl298
-rw-r--r--src/rabbit_channel_tracking_handler.erl80
-rw-r--r--src/rabbit_connection_tracking.erl5
-rw-r--r--test/per_user_connection_channel_tracking_SUITE.erl840
6 files changed, 1229 insertions, 3 deletions
diff --git a/Makefile b/Makefile
index 5f9373c9b3..a90fa79ac2 100644
--- a/Makefile
+++ b/Makefile
@@ -115,7 +115,9 @@ define PROJECT_ENV
%% Default max message size is 128 MB
{max_message_size, 134217728},
%% Socket writer will run GC every 1 GB of outgoing data
- {writer_gc_threshold, 1000000000}
+ {writer_gc_threshold, 1000000000},
+ %% interval at which connection/channel tracking executes post operations
+ {tracking_execution_timeout, 5000}
]
endef
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f58ae1db94..f057fb0da3 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -199,6 +199,11 @@
{mfa, {rabbit_connection_tracking, boot, []}},
{enables, routing_ready}]}).
+-rabbit_boot_step({channel_tracking,
+ [{description, "channel tracking infrastructure"},
+ {mfa, {rabbit_channel_tracking, boot, []}},
+ {enables, routing_ready}]}).
+
-rabbit_boot_step({background_gc,
[{description, "background garbage collection"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl
new file mode 100644
index 0000000000..aac3b92e1b
--- /dev/null
+++ b/src/rabbit_channel_tracking.erl
@@ -0,0 +1,298 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_channel_tracking).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_channel_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+-behaviour(rabbit_tracking).
+
+-export([boot/0,
+ update_tracked/1,
+ handle_cast/1,
+ register_tracked/1,
+ unregister_tracked/1,
+ count_tracked_items_in/1,
+ clear_tracking_tables/0,
+ shutdown_tracked_items/2]).
+
+-export([list/0, list_of_user/1, list_on_node/1,
+ tracked_channel_table_name_for/1,
+ tracked_channel_per_user_table_name_for/1,
+ get_all_tracked_channel_table_names_for_node/1,
+ delete_tracked_channel_user_entry/1]).
+
+-include_lib("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+%%
+%% API
+%%
+
+%% Sets up and resets channel tracking tables for this node.
+-spec boot() -> ok.
+
+boot() ->
+ ensure_tracked_channels_table_for_this_node(),
+ rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
+ [tracked_channel_table_name_for(node())]),
+ ensure_per_user_tracked_channels_table_for_node(),
+ rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
+ [tracked_channel_per_user_table_name_for(node())]),
+ clear_tracking_tables(),
+ ok.
+
+-spec update_tracked(term()) -> ok.
+
+update_tracked(Event) ->
+ spawn(?MODULE, handle_cast, [Event]).
+
+%% Asynchronously handle update events
+-spec handle_cast(term()) -> ok.
+
+handle_cast({channel_created, Details}) ->
+ ThisNode = node(),
+ case node(pget(pid, Details)) of
+ ThisNode ->
+ TrackedCh = #tracked_channel{id = TrackedChId} =
+ tracked_channel_from_channel_created_event(Details),
+ try
+ register_tracked(TrackedCh)
+ catch
+ error:{no_exists, _} ->
+ Msg = "Could not register channel ~p for tracking, "
+ "its table is not ready yet or the channel terminated prematurely",
+ rabbit_log_connection:warning(Msg, [TrackedChId]),
+ ok;
+ error:Err ->
+ Msg = "Could not register channel ~p for tracking: ~p",
+ rabbit_log_connection:warning(Msg, [TrackedChId, Err]),
+ ok
+ end;
+ _OtherNode ->
+ %% ignore
+ ok
+ end;
+handle_cast({channel_closed, Details}) ->
+ %% channel has terminated, unregister iff local
+ case get_tracked_channel_by_pid(pget(pid, Details)) of
+ [#tracked_channel{name = Name}] ->
+ unregister_tracked(rabbit_tracking:id(node(), Name));
+ _Other -> ok
+ end;
+handle_cast({connection_closed, ConnDetails}) ->
+ ThisNode= node(),
+ ConnPid = pget(pid, ConnDetails),
+
+ case pget(node, ConnDetails) of
+ ThisNode ->
+ TrackedChs = get_tracked_channels_by_connection_pid(ConnPid),
+ rabbit_log_connection:info(
+ "Closing all channels from connection '~p' "
+ "because it has been closed", [pget(name, ConnDetails)]),
+ shutdown_tracked_items(TrackedChs, undefined),
+ [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) ||
+ #tracked_channel{name = Name} <- TrackedChs],
+ ok;
+ _DifferentNode ->
+ ok
+ end;
+handle_cast({user_deleted, Details}) ->
+ Username = pget(name, Details),
+ %% Schedule user entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_channel_user_entry, [Username]),
+ ok;
+handle_cast({node_deleted, Details}) ->
+ Node = pget(node, Details),
+ rabbit_log_connection:info(
+ "Node '~s' was removed from the cluster, deleting"
+ " its channel tracking tables...", [Node]),
+ delete_tracked_channels_table_for_node(Node),
+ delete_per_user_tracked_channels_table_for_node(Node).
+
+-spec register_tracked(rabbit_types:tracked_channel()) -> ok.
+-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]).
+
+register_tracked(TrackedCh =
+ #tracked_channel{node = Node, name = Name, username = Username}) ->
+ ChId = rabbit_tracking:id(Node, Name),
+ TableName = tracked_channel_table_name_for(Node),
+ PerUserChTableName = tracked_channel_per_user_table_name_for(Node),
+ %% upsert
+ case mnesia:dirty_read(TableName, ChId) of
+ [] ->
+ mnesia:dirty_write(TableName, TrackedCh),
+ mnesia:dirty_update_counter(PerUserChTableName, Username, 1);
+ [#tracked_channel{}] ->
+ ok
+ end,
+ ok.
+
+-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok.
+
+unregister_tracked(ChId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_channel_table_name_for(Node),
+ PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node),
+ case mnesia:dirty_read(TableName, ChId) of
+ [] -> ok;
+ [#tracked_channel{username = Username}] ->
+ mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1),
+ mnesia:dirty_delete(TableName, ChId)
+ end.
+
+-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer().
+
+count_tracked_items_in({user, Username}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_channel_per_user_table_name_for/1,
+ #tracked_channel_per_user.channel_count, Username,
+ "channels in vhost").
+
+-spec clear_tracking_tables() -> ok.
+
+clear_tracking_tables() ->
+ clear_tracked_channel_tables_for_this_node().
+
+-spec shutdown_tracked_items(list(), term()) -> ok.
+
+shutdown_tracked_items(TrackedItems, _Args) ->
+ close_channels(TrackedItems).
+
+%% helper functions
+-spec list() -> [rabbit_types:tracked_channel()].
+
+list() ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_channel_table_name_for(Node),
+ Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
+ end, [], rabbit_mnesia:cluster_nodes(running)).
+
+-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
+
+list_of_user(Username) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{username = Username, _ = '_'}).
+
+-spec list_on_node(node()) -> [rabbit_types:tracked_channel()].
+
+list_on_node(Node) ->
+ try mnesia:dirty_match_object(
+ tracked_channel_table_name_for(Node),
+ #tracked_channel{_ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+-spec tracked_channel_table_name_for(node()) -> atom().
+
+tracked_channel_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])).
+
+-spec tracked_channel_per_user_table_name_for(node()) -> atom().
+
+tracked_channel_per_user_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format(
+ "tracked_channel_table_per_user_on_node_~s", [Node])).
+
+%% internal
+ensure_tracked_channels_table_for_this_node() ->
+ ensure_tracked_channels_table_for_node(node()).
+
+ensure_per_user_tracked_channels_table_for_node() ->
+ ensure_per_user_tracked_channels_table_for_node(node()).
+
+%% Create tables
+ensure_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_channel},
+ {attributes, record_info(fields, tracked_channel)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+ensure_per_user_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_per_user_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user},
+ {attributes, record_info(fields, tracked_channel_per_user)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+clear_tracked_channel_tables_for_this_node() ->
+ [rabbit_tracking:clear_tracking_table(T)
+ || T <- get_all_tracked_channel_table_names_for_node(node())].
+
+delete_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel").
+
+delete_per_user_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_per_user_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node,
+ "per-user tracked channels").
+
+get_all_tracked_channel_table_names_for_node(Node) ->
+ [tracked_channel_table_name_for(Node),
+ tracked_channel_per_user_table_name_for(Node)].
+
+get_tracked_channels_by_connection_pid(ConnPid) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{connection = ConnPid, _ = '_'}).
+
+get_tracked_channel_by_pid(ChPid) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{pid = ChPid, _ = '_'}).
+
+delete_tracked_channel_user_entry(Username) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_auth_backend_internal, exists, [Username]},
+ fun tracked_channel_per_user_table_name_for/1,
+ Username).
+
+tracked_channel_from_channel_created_event(ChannelDetails) ->
+ Node = node(ChPid = pget(pid, ChannelDetails)),
+ Name = pget(name, ChannelDetails),
+ #tracked_channel{
+ id = rabbit_tracking:id(Node, Name),
+ name = Name,
+ node = Node,
+ vhost = pget(vhost, ChannelDetails),
+ pid = ChPid,
+ connection = pget(connection, ChannelDetails),
+ username = pget(user, ChannelDetails)}.
+
+close_channels(TrackedChannels = [#tracked_channel{}|_]) ->
+ [rabbit_channel:shutdown(ChPid)
+ || #tracked_channel{pid = ChPid} <- TrackedChannels],
+ ok;
+close_channels(_TrackedChannels = []) -> ok.
diff --git a/src/rabbit_channel_tracking_handler.erl b/src/rabbit_channel_tracking_handler.erl
new file mode 100644
index 0000000000..b2556b8742
--- /dev/null
+++ b/src/rabbit_channel_tracking_handler.erl
@@ -0,0 +1,80 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_channel_tracking_handler).
+
+%% This module keeps track of channel creation and termination events
+%% on its local node. Similar to the rabbit_connection_tracking_handler,
+%% the primary goal here is to decouple channel tracking from rabbit_reader
+%% and isolate channel tracking to its own process to avoid blocking connection
+%% creation events. Additionaly, creation events are also non-blocking in that
+%% they spawn a short-live process for updating the tracking tables in realtime.
+%%
+%% Events from other nodes are ignored.
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include_lib("rabbit.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "channel tracking event handler"},
+ {mfa, {gen_event, add_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {cleanup, {gen_event, delete_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {requires, [channel_tracking]},
+ {enables, recovery}]}).
+
+%%
+%% API
+%%
+
+init([]) ->
+ {ok, []}.
+
+handle_event(#event{type = channel_created, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({channel_created, Details}),
+ {ok, State};
+handle_event(#event{type = channel_closed, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({channel_closed, Details}),
+ {ok, State};
+handle_event(#event{type = connection_closed, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({connection_closed, Details}),
+ {ok, State};
+handle_event(#event{type = user_deleted, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({user_deleted, Details}),
+ {ok, State};
+%% A node had been deleted from the cluster.
+handle_event(#event{type = node_deleted, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({node_deleted, Details}),
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index c9647acbe4..85ce317bba 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -37,6 +37,7 @@
tracked_connection_table_name_for/1,
tracked_connection_per_vhost_table_name_for/1,
tracked_connection_per_user_table_name_for/1,
+ get_all_tracked_connection_table_names_for_node/1,
delete_tracked_connections_table_for_node/1,
delete_per_vhost_tracked_connections_table_for_node/1,
@@ -128,7 +129,7 @@ handle_cast({connection_closed, Details}) ->
handle_cast({vhost_deleted, Details}) ->
VHost = pget(name, Details),
%% Schedule vhost entry deletion, allowing time for connections to close
- _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_connection_vhost_entry, [VHost]),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
shutdown_tracked_items(
@@ -149,7 +150,7 @@ handle_cast({vhost_down, Details}) ->
handle_cast({user_deleted, Details}) ->
Username = pget(name, Details),
%% Schedule user entry deletion, allowing time for connections to close
- _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_connection_user_entry, [Username]),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
shutdown_tracked_items(
diff --git a/test/per_user_connection_channel_tracking_SUITE.erl b/test/per_user_connection_channel_tracking_SUITE.erl
new file mode 100644
index 0000000000..b2984d832c
--- /dev/null
+++ b/test/per_user_connection_channel_tracking_SUITE.erl
@@ -0,0 +1,840 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(per_user_connection_channel_tracking_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_1_network},
+ {group, cluster_size_2_network},
+ {group, cluster_size_1_direct},
+ {group, cluster_size_2_direct}
+ ].
+
+groups() ->
+ ClusterSize1Tests = [
+ single_node_user_connection_channel_tracking,
+ single_node_user_deletion,
+ single_node_vhost_down_mimic,
+ single_node_vhost_deletion
+ ],
+ ClusterSize2Tests = [
+ cluster_user_deletion,
+ cluster_vhost_down_mimic,
+ cluster_vhost_deletion,
+ cluster_node_removed
+ ],
+ [
+ {cluster_size_1_network, [], ClusterSize1Tests},
+ {cluster_size_2_network, [], ClusterSize2Tests},
+ {cluster_size_1_direct, [], ClusterSize1Tests},
+ {cluster_size_2_direct, [], ClusterSize2Tests}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_1_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_1_network, Config1, 1);
+init_per_group(cluster_size_2_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_2_network, Config1, 2);
+init_per_group(cluster_size_1_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_1_direct, Config1, 1);
+init_per_group(cluster_size_2_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_2_direct, Config1, 2).
+
+init_per_multinode_group(_Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ clear_all_connection_tracking_tables(Config),
+ Config.
+
+end_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+clear_all_connection_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_connection_tracking,
+ clear_tracking_tables,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+single_node_user_connection_channel_tracking(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ [Chan1] = open_channels(Conn1, 1),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ [#tracked_channel{username = Username}] = channels_in(Config, Username),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ ?assertEqual(true, is_process_alive(Chan1)),
+ close_channels([Chan1]),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Chan1)),
+ close_connections([Conn1]),
+ ?assertEqual(0, length(connections_in(Config, Username))),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ timer:sleep(100),
+ [#tracked_connection{username = Username2}] = connections_in(Config, Username2),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ [Conn3] = open_connections(Config, [0]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn3)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3],
+
+ [Conn4] = open_connections(Config, [0]),
+ Chans4 = [_|_] = open_channels(Conn4, 5),
+ ?assertEqual(2, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(10, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn4)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4],
+ kill_connections([Conn4]),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn4)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4],
+
+ [Conn5] = open_connections(Config, [0]),
+ Chans5 = [_|_] = open_channels(Conn5, 7),
+ [Username, Username] =
+ lists:map(fun (#tracked_connection{username = U}) -> U end,
+ connections_in(Config, Username)),
+ ?assertEqual(12, count_channels_in(Config, Username)),
+ ?assertEqual(12, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(2, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn5)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans5),
+ ?assertEqual(0, length(all_channels(Config))),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ close_connections([Conn2, Conn3, Conn5]),
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, length(all_connections(Config))).
+
+single_node_user_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 100),
+
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)),
+ ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, Username2)),
+ ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, Username2)),
+
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ close_channels(Chans1),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+
+single_node_vhost_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 100),
+
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
+
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost).
+
+single_node_vhost_down_mimic(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% mimic vhost down event, while connections exist
+ mimic_vhost_down(Config, 0, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
+
+cluster_user_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 0, 100),
+ set_tracking_execution_timeout(Config, 1, 100),
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
+ ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% ensure user entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
+ ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
+
+ close_channels(Chans1),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+
+cluster_vhost_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 0, 100),
+ set_tracking_execution_timeout(Config, 1, 100),
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
+
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost),
+ rabbit_ct_broker_helpers:add_user(Config, Username),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost).
+
+cluster_vhost_down_mimic(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ mimic_vhost_down(Config, 1, Vhost),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% gen_event notifies local handlers. remote connections still active
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ mimic_vhost_down(Config, 0, Vhost),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
+
+cluster_node_removed(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ rabbit_ct_broker_helpers:stop_broker(Config, 1),
+ timer:sleep(200),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1),
+ timer:sleep(200),
+ NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+
+ DroppedConnTrackingTables =
+ rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName),
+ [?assertEqual(
+ {'EXIT', {aborted, {no_exists, Tab, all}}},
+ catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables],
+
+ DroppedChTrackingTables =
+ rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName),
+ [?assertEqual(
+ {'EXIT', {aborted, {no_exists, Tab, all}}},
+ catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables],
+
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ close_channels(Chans1),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+open_connections(Config, NodesAndUsers) ->
+ % Randomly select connection type
+ OpenConnectionFun = case ?config(connection_type, Config) of
+ network -> open_unmanaged_connection;
+ direct -> open_unmanaged_connection_direct
+ end,
+ Conns = lists:map(fun
+ ({Node, User}) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
+ User, User);
+ (Node) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
+ end, NodesAndUsers),
+ timer:sleep(500),
+ Conns.
+
+close_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ rabbit_ct_client_helpers:close_connection(Conn)
+ end, Conns),
+ timer:sleep(500).
+
+kill_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ (catch exit(Conn, please_terminate))
+ end, Conns),
+ timer:sleep(500).
+
+open_channels(Conn, N) ->
+ [begin
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ Ch
+ end || _ <- lists:seq(1, N)].
+
+close_channels(Channels = [_|_]) ->
+ [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels].
+
+count_connections_in(Config, Username) ->
+ length(connections_in(Config, Username)).
+
+connections_in(Config, Username) ->
+ connections_in(Config, 0, Username).
+connections_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+count_channels_in(Config, Username) ->
+ Channels = channels_in(Config, Username),
+ length([Ch || Ch = #tracked_channel{username = Username0} <- Channels,
+ Username =:= Username0]).
+
+channels_in(Config, Username) ->
+ channels_in(Config, 0, Username).
+channels_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list_of_user, [Username]).
+
+tracked_user_connection_count(Config, Username) ->
+ tracked_user_connection_count(Config, 0, Username).
+tracked_user_connection_count(Config, NodeIndex, Username) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+tracked_user_channel_count(Config, Username) ->
+ tracked_user_channel_count(Config, 0, Username).
+tracked_user_channel_count(Config, NodeIndex, Username) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ count_tracked_items_in, [{user, Username}]).
+
+exists_in_tracked_connection_per_vhost_table(Config, VHost) ->
+ exists_in_tracked_connection_per_vhost_table(Config, 0, VHost).
+exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1,
+ VHost).
+
+exists_in_tracked_connection_per_user_table(Config, Username) ->
+ exists_in_tracked_connection_per_user_table(Config, 0, Username).
+exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1,
+ Username).
+
+exists_in_tracked_channel_per_user_table(Config, Username) ->
+ exists_in_tracked_channel_per_user_table(Config, 0, Username).
+exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1,
+ Username).
+
+exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ Tab = TableNameFun(Node),
+ AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ mnesia,
+ dirty_all_keys, [Tab]),
+ lists:member(Key, AllKeys).
+
+mimic_vhost_down(Config, NodeIndex, VHost) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_vhost, vhost_down, [VHost]).
+
+all_connections(Config) ->
+ all_connections(Config, 0).
+all_connections(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_connection_tracking).
+
+all_channels(Config) ->
+ all_channels(Config, 0).
+all_channels(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_channel_tracking).
+
+all_tracked_items(Config, NodeIndex, TrackingMod) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list, []).
+
+set_up_vhost(Config, VHost) ->
+ rabbit_ct_broker_helpers:add_vhost(Config, VHost),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
+ set_vhost_connection_limit(Config, VHost, -1).
+
+set_vhost_connection_limit(Config, VHost, Count) ->
+ set_vhost_connection_limit(Config, 0, VHost, Count).
+
+set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ set_vhost_limits, Node,
+ ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
+ [{"-p", binary_to_list(VHost)}]).
+
+set_tracking_execution_timeout(Config, Timeout) ->
+ set_tracking_execution_timeout(Config, 0, Timeout).
+set_tracking_execution_timeout(Config, NodeIndex, Timeout) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ application, set_env,
+ [rabbit, tracking_execution_timeout, Timeout]).
+
+get_tracking_execution_timeout(Config) ->
+ get_tracking_execution_timeout(Config, 0).
+get_tracking_execution_timeout(Config, NodeIndex) ->
+ {ok, Timeout} = rabbit_ct_broker_helpers:rpc(
+ Config, NodeIndex,
+ application, get_env,
+ [rabbit, tracking_execution_timeout]),
+ Timeout.
+
+await_running_node_refresh(_Config, _NodeIndex) ->
+ timer:sleep(250).
+
+expect_that_client_connection_is_rejected(Config) ->
+ expect_that_client_connection_is_rejected(Config, 0).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost).