diff options
| author | Ayanda-D <ayanda.dube@erlang-solutions.com> | 2020-06-10 10:10:48 +0100 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-09-02 04:28:58 +0300 |
| commit | 3ea173725611662a666c95c336c3f3d0d34ff74a (patch) | |
| tree | e7510513f46580965a1e5365efcea456a8d2555f | |
| parent | 275cecce2f9f429a77190712ad3f45afcbc31459 (diff) | |
| download | rabbitmq-server-git-3ea173725611662a666c95c336c3f3d0d34ff74a.tar.gz | |
Introduce per-user channel tracking
Make 'tracking_execution_timeout' configurable
Add per_user_connection_channel_tracking_SUITE
| -rw-r--r-- | Makefile | 4 | ||||
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel_tracking.erl | 298 | ||||
| -rw-r--r-- | src/rabbit_channel_tracking_handler.erl | 80 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 5 | ||||
| -rw-r--r-- | test/per_user_connection_channel_tracking_SUITE.erl | 840 |
6 files changed, 1229 insertions, 3 deletions
@@ -115,7 +115,9 @@ define PROJECT_ENV %% Default max message size is 128 MB {max_message_size, 134217728}, %% Socket writer will run GC every 1 GB of outgoing data - {writer_gc_threshold, 1000000000} + {writer_gc_threshold, 1000000000}, + %% interval at which connection/channel tracking executes post operations + {tracking_execution_timeout, 5000} ] endef diff --git a/src/rabbit.erl b/src/rabbit.erl index f58ae1db94..f057fb0da3 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -199,6 +199,11 @@ {mfa, {rabbit_connection_tracking, boot, []}}, {enables, routing_ready}]}). +-rabbit_boot_step({channel_tracking, + [{description, "channel tracking infrastructure"}, + {mfa, {rabbit_channel_tracking, boot, []}}, + {enables, routing_ready}]}). + -rabbit_boot_step({background_gc, [{description, "background garbage collection"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl new file mode 100644 index 0000000000..aac3b92e1b --- /dev/null +++ b/src/rabbit_channel_tracking.erl @@ -0,0 +1,298 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_channel_tracking). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_channel_tracking_handler +%% * rabbit_reader +%% * rabbit_event +-behaviour(rabbit_tracking). + +-export([boot/0, + update_tracked/1, + handle_cast/1, + register_tracked/1, + unregister_tracked/1, + count_tracked_items_in/1, + clear_tracking_tables/0, + shutdown_tracked_items/2]). + +-export([list/0, list_of_user/1, list_on_node/1, + tracked_channel_table_name_for/1, + tracked_channel_per_user_table_name_for/1, + get_all_tracked_channel_table_names_for_node/1, + delete_tracked_channel_user_entry/1]). + +-include_lib("rabbit.hrl"). + +-import(rabbit_misc, [pget/2]). + +%% +%% API +%% + +%% Sets up and resets channel tracking tables for this node. +-spec boot() -> ok. + +boot() -> + ensure_tracked_channels_table_for_this_node(), + rabbit_log:info("Setting up a table for channel tracking on this node: ~p", + [tracked_channel_table_name_for(node())]), + ensure_per_user_tracked_channels_table_for_node(), + rabbit_log:info("Setting up a table for channel tracking on this node: ~p", + [tracked_channel_per_user_table_name_for(node())]), + clear_tracking_tables(), + ok. + +-spec update_tracked(term()) -> ok. + +update_tracked(Event) -> + spawn(?MODULE, handle_cast, [Event]). + +%% Asynchronously handle update events +-spec handle_cast(term()) -> ok. + +handle_cast({channel_created, Details}) -> + ThisNode = node(), + case node(pget(pid, Details)) of + ThisNode -> + TrackedCh = #tracked_channel{id = TrackedChId} = + tracked_channel_from_channel_created_event(Details), + try + register_tracked(TrackedCh) + catch + error:{no_exists, _} -> + Msg = "Could not register channel ~p for tracking, " + "its table is not ready yet or the channel terminated prematurely", + rabbit_log_connection:warning(Msg, [TrackedChId]), + ok; + error:Err -> + Msg = "Could not register channel ~p for tracking: ~p", + rabbit_log_connection:warning(Msg, [TrackedChId, Err]), + ok + end; + _OtherNode -> + %% ignore + ok + end; +handle_cast({channel_closed, Details}) -> + %% channel has terminated, unregister iff local + case get_tracked_channel_by_pid(pget(pid, Details)) of + [#tracked_channel{name = Name}] -> + unregister_tracked(rabbit_tracking:id(node(), Name)); + _Other -> ok + end; +handle_cast({connection_closed, ConnDetails}) -> + ThisNode= node(), + ConnPid = pget(pid, ConnDetails), + + case pget(node, ConnDetails) of + ThisNode -> + TrackedChs = get_tracked_channels_by_connection_pid(ConnPid), + rabbit_log_connection:info( + "Closing all channels from connection '~p' " + "because it has been closed", [pget(name, ConnDetails)]), + shutdown_tracked_items(TrackedChs, undefined), + [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) || + #tracked_channel{name = Name} <- TrackedChs], + ok; + _DifferentNode -> + ok + end; +handle_cast({user_deleted, Details}) -> + Username = pget(name, Details), + %% Schedule user entry deletion, allowing time for connections to close + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, + delete_tracked_channel_user_entry, [Username]), + ok; +handle_cast({node_deleted, Details}) -> + Node = pget(node, Details), + rabbit_log_connection:info( + "Node '~s' was removed from the cluster, deleting" + " its channel tracking tables...", [Node]), + delete_tracked_channels_table_for_node(Node), + delete_per_user_tracked_channels_table_for_node(Node). + +-spec register_tracked(rabbit_types:tracked_channel()) -> ok. +-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]). + +register_tracked(TrackedCh = + #tracked_channel{node = Node, name = Name, username = Username}) -> + ChId = rabbit_tracking:id(Node, Name), + TableName = tracked_channel_table_name_for(Node), + PerUserChTableName = tracked_channel_per_user_table_name_for(Node), + %% upsert + case mnesia:dirty_read(TableName, ChId) of + [] -> + mnesia:dirty_write(TableName, TrackedCh), + mnesia:dirty_update_counter(PerUserChTableName, Username, 1); + [#tracked_channel{}] -> + ok + end, + ok. + +-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok. + +unregister_tracked(ChId = {Node, _Name}) when Node =:= node() -> + TableName = tracked_channel_table_name_for(Node), + PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node), + case mnesia:dirty_read(TableName, ChId) of + [] -> ok; + [#tracked_channel{username = Username}] -> + mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1), + mnesia:dirty_delete(TableName, ChId) + end. + +-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer(). + +count_tracked_items_in({user, Username}) -> + rabbit_tracking:count_tracked_items( + fun tracked_channel_per_user_table_name_for/1, + #tracked_channel_per_user.channel_count, Username, + "channels in vhost"). + +-spec clear_tracking_tables() -> ok. + +clear_tracking_tables() -> + clear_tracked_channel_tables_for_this_node(). + +-spec shutdown_tracked_items(list(), term()) -> ok. + +shutdown_tracked_items(TrackedItems, _Args) -> + close_channels(TrackedItems). + +%% helper functions +-spec list() -> [rabbit_types:tracked_channel()]. + +list() -> + lists:foldl( + fun (Node, Acc) -> + Tab = tracked_channel_table_name_for(Node), + Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'}) + end, [], rabbit_mnesia:cluster_nodes(running)). + +-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()]. + +list_of_user(Username) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{username = Username, _ = '_'}). + +-spec list_on_node(node()) -> [rabbit_types:tracked_channel()]. + +list_on_node(Node) -> + try mnesia:dirty_match_object( + tracked_channel_table_name_for(Node), + #tracked_channel{_ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + +-spec tracked_channel_table_name_for(node()) -> atom(). + +tracked_channel_table_name_for(Node) -> + list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])). + +-spec tracked_channel_per_user_table_name_for(node()) -> atom(). + +tracked_channel_per_user_table_name_for(Node) -> + list_to_atom(rabbit_misc:format( + "tracked_channel_table_per_user_on_node_~s", [Node])). + +%% internal +ensure_tracked_channels_table_for_this_node() -> + ensure_tracked_channels_table_for_node(node()). + +ensure_per_user_tracked_channels_table_for_node() -> + ensure_per_user_tracked_channels_table_for_node(node()). + +%% Create tables +ensure_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_channel}, + {attributes, record_info(fields, tracked_channel)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]), + ok + end. + +ensure_per_user_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_per_user_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user}, + {attributes, record_info(fields, tracked_channel_per_user)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]), + ok + end. + +clear_tracked_channel_tables_for_this_node() -> + [rabbit_tracking:clear_tracking_table(T) + || T <- get_all_tracked_channel_table_names_for_node(node())]. + +delete_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_table_name_for(Node), + rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel"). + +delete_per_user_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_per_user_table_name_for(Node), + rabbit_tracking:delete_tracking_table(TableName, Node, + "per-user tracked channels"). + +get_all_tracked_channel_table_names_for_node(Node) -> + [tracked_channel_table_name_for(Node), + tracked_channel_per_user_table_name_for(Node)]. + +get_tracked_channels_by_connection_pid(ConnPid) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{connection = ConnPid, _ = '_'}). + +get_tracked_channel_by_pid(ChPid) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{pid = ChPid, _ = '_'}). + +delete_tracked_channel_user_entry(Username) -> + rabbit_tracking:delete_tracked_entry( + {rabbit_auth_backend_internal, exists, [Username]}, + fun tracked_channel_per_user_table_name_for/1, + Username). + +tracked_channel_from_channel_created_event(ChannelDetails) -> + Node = node(ChPid = pget(pid, ChannelDetails)), + Name = pget(name, ChannelDetails), + #tracked_channel{ + id = rabbit_tracking:id(Node, Name), + name = Name, + node = Node, + vhost = pget(vhost, ChannelDetails), + pid = ChPid, + connection = pget(connection, ChannelDetails), + username = pget(user, ChannelDetails)}. + +close_channels(TrackedChannels = [#tracked_channel{}|_]) -> + [rabbit_channel:shutdown(ChPid) + || #tracked_channel{pid = ChPid} <- TrackedChannels], + ok; +close_channels(_TrackedChannels = []) -> ok. diff --git a/src/rabbit_channel_tracking_handler.erl b/src/rabbit_channel_tracking_handler.erl new file mode 100644 index 0000000000..b2556b8742 --- /dev/null +++ b/src/rabbit_channel_tracking_handler.erl @@ -0,0 +1,80 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_channel_tracking_handler). + +%% This module keeps track of channel creation and termination events +%% on its local node. Similar to the rabbit_connection_tracking_handler, +%% the primary goal here is to decouple channel tracking from rabbit_reader +%% and isolate channel tracking to its own process to avoid blocking connection +%% creation events. Additionaly, creation events are also non-blocking in that +%% they spawn a short-live process for updating the tracking tables in realtime. +%% +%% Events from other nodes are ignored. + +-behaviour(gen_event). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("rabbit.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "channel tracking event handler"}, + {mfa, {gen_event, add_handler, + [rabbit_event, ?MODULE, []]}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, [channel_tracking]}, + {enables, recovery}]}). + +%% +%% API +%% + +init([]) -> + {ok, []}. + +handle_event(#event{type = channel_created, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({channel_created, Details}), + {ok, State}; +handle_event(#event{type = channel_closed, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({channel_closed, Details}), + {ok, State}; +handle_event(#event{type = connection_closed, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({connection_closed, Details}), + {ok, State}; +handle_event(#event{type = user_deleted, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({user_deleted, Details}), + {ok, State}; +%% A node had been deleted from the cluster. +handle_event(#event{type = node_deleted, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({node_deleted, Details}), + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index c9647acbe4..85ce317bba 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -37,6 +37,7 @@ tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1, tracked_connection_per_user_table_name_for/1, + get_all_tracked_connection_table_names_for_node/1, delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, @@ -128,7 +129,7 @@ handle_cast({connection_closed, Details}) -> handle_cast({vhost_deleted, Details}) -> VHost = pget(name, Details), %% Schedule vhost entry deletion, allowing time for connections to close - _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE, + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, delete_tracked_connection_vhost_entry, [VHost]), rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), shutdown_tracked_items( @@ -149,7 +150,7 @@ handle_cast({vhost_down, Details}) -> handle_cast({user_deleted, Details}) -> Username = pget(name, Details), %% Schedule user entry deletion, allowing time for connections to close - _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE, + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, delete_tracked_connection_user_entry, [Username]), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), shutdown_tracked_items( diff --git a/test/per_user_connection_channel_tracking_SUITE.erl b/test/per_user_connection_channel_tracking_SUITE.erl new file mode 100644 index 0000000000..b2984d832c --- /dev/null +++ b/test/per_user_connection_channel_tracking_SUITE.erl @@ -0,0 +1,840 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(per_user_connection_channel_tracking_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_1_network}, + {group, cluster_size_2_network}, + {group, cluster_size_1_direct}, + {group, cluster_size_2_direct} + ]. + +groups() -> + ClusterSize1Tests = [ + single_node_user_connection_channel_tracking, + single_node_user_deletion, + single_node_vhost_down_mimic, + single_node_vhost_deletion + ], + ClusterSize2Tests = [ + cluster_user_deletion, + cluster_vhost_down_mimic, + cluster_vhost_deletion, + cluster_node_removed + ], + [ + {cluster_size_1_network, [], ClusterSize1Tests}, + {cluster_size_2_network, [], ClusterSize2Tests}, + {cluster_size_1_direct, [], ClusterSize1Tests}, + {cluster_size_2_direct, [], ClusterSize2Tests} + ]. + +suite() -> + [ + %% If a test hangs, no need to wait for 30 minutes. + {timetrap, {minutes, 8}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_1_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_1_network, Config1, 1); +init_per_group(cluster_size_2_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_2_network, Config1, 2); +init_per_group(cluster_size_1_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_1_direct, Config1, 1); +init_per_group(cluster_size_2_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_2_direct, Config1, 2). + +init_per_multinode_group(_Group, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + clear_all_connection_tracking_tables(Config), + Config. + +end_per_testcase(Testcase, Config) -> + clear_all_connection_tracking_tables(Config), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +clear_all_connection_tracking_tables(Config) -> + [rabbit_ct_broker_helpers:rpc(Config, + N, + rabbit_connection_tracking, + clear_tracking_tables, + []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- +single_node_user_connection_channel_tracking(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + [Chan1] = open_channels(Conn1, 1), + [#tracked_connection{username = Username}] = connections_in(Config, Username), + [#tracked_channel{username = Username}] = channels_in(Config, Username), + ?assertEqual(true, is_process_alive(Conn1)), + ?assertEqual(true, is_process_alive(Chan1)), + close_channels([Chan1]), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Chan1)), + close_connections([Conn1]), + ?assertEqual(0, length(connections_in(Config, Username))), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + timer:sleep(100), + [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + [Conn3] = open_connections(Config, [0]), + Chans3 = [_|_] = open_channels(Conn3, 5), + [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn3)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3], + + [Conn4] = open_connections(Config, [0]), + Chans4 = [_|_] = open_channels(Conn4, 5), + ?assertEqual(2, tracked_user_connection_count(Config, Username)), + ?assertEqual(10, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn4)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4], + kill_connections([Conn4]), + [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn4)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4], + + [Conn5] = open_connections(Config, [0]), + Chans5 = [_|_] = open_channels(Conn5, 7), + [Username, Username] = + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username)), + ?assertEqual(12, count_channels_in(Config, Username)), + ?assertEqual(12, tracked_user_channel_count(Config, Username)), + ?assertEqual(2, tracked_user_connection_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn5)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5], + + close_channels(Chans2 ++ Chans3 ++ Chans5), + ?assertEqual(0, length(all_channels(Config))), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + close_connections([Conn2, Conn3, Conn5]), + rabbit_ct_broker_helpers:delete_user(Config, Username2), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, length(all_connections(Config))). + +single_node_user_deletion(Config) -> + set_tracking_execution_timeout(Config, 100), + + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)), + ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)), + + rabbit_ct_broker_helpers:delete_user(Config, Username2), + timer:sleep(100), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + %% ensure vhost entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, Username2)), + ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, Username2)), + + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + close_channels(Chans1), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)). + +single_node_vhost_deletion(Config) -> + set_tracking_execution_timeout(Config, 100), + + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)), + + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1], + + %% ensure vhost entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)), + + rabbit_ct_broker_helpers:add_vhost(Config, Vhost). + +single_node_vhost_down_mimic(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + %% mimic vhost down event, while connections exist + mimic_vhost_down(Config, 0, Vhost), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1]. + +cluster_user_deletion(Config) -> + set_tracking_execution_timeout(Config, 0, 100), + set_tracking_execution_timeout(Config, 1, 100), + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config, 0)), + ?assertEqual(100, get_tracking_execution_timeout(Config, 1)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)), + ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)), + + rabbit_ct_broker_helpers:delete_user(Config, Username2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + %% ensure user entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)), + ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)), + + close_channels(Chans1), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)). + +cluster_vhost_deletion(Config) -> + set_tracking_execution_timeout(Config, 0, 100), + set_tracking_execution_timeout(Config, 1, 100), + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config, 0)), + ?assertEqual(100, get_tracking_execution_timeout(Config, 1)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [{0, Username}]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)), + ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)), + + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1], + + %% ensure vhost entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)), + ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)), + + rabbit_ct_broker_helpers:add_vhost(Config, Vhost), + rabbit_ct_broker_helpers:add_user(Config, Username), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost). + +cluster_vhost_down_mimic(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [{0, Username}]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + mimic_vhost_down(Config, 1, Vhost), + timer:sleep(100), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + %% gen_event notifies local handlers. remote connections still active + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + mimic_vhost_down(Config, 0, Vhost), + timer:sleep(100), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1]. + +cluster_node_removed(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [{0, Username}]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + rabbit_ct_broker_helpers:stop_broker(Config, 1), + timer:sleep(200), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1), + timer:sleep(200), + NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + + DroppedConnTrackingTables = + rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName), + [?assertEqual( + {'EXIT', {aborted, {no_exists, Tab, all}}}, + catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables], + + DroppedChTrackingTables = + rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName), + [?assertEqual( + {'EXIT', {aborted, {no_exists, Tab, all}}}, + catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables], + + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + close_channels(Chans1), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +open_connections(Config, NodesAndUsers) -> + % Randomly select connection type + OpenConnectionFun = case ?config(connection_type, Config) of + network -> open_unmanaged_connection; + direct -> open_unmanaged_connection_direct + end, + Conns = lists:map(fun + ({Node, User}) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, + User, User); + (Node) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) + end, NodesAndUsers), + timer:sleep(500), + Conns. + +close_connections(Conns) -> + lists:foreach(fun + (Conn) -> + rabbit_ct_client_helpers:close_connection(Conn) + end, Conns), + timer:sleep(500). + +kill_connections(Conns) -> + lists:foreach(fun + (Conn) -> + (catch exit(Conn, please_terminate)) + end, Conns), + timer:sleep(500). + +open_channels(Conn, N) -> + [begin + {ok, Ch} = amqp_connection:open_channel(Conn), + Ch + end || _ <- lists:seq(1, N)]. + +close_channels(Channels = [_|_]) -> + [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels]. + +count_connections_in(Config, Username) -> + length(connections_in(Config, Username)). + +connections_in(Config, Username) -> + connections_in(Config, 0, Username). +connections_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username). + +count_channels_in(Config, Username) -> + Channels = channels_in(Config, Username), + length([Ch || Ch = #tracked_channel{username = Username0} <- Channels, + Username =:= Username0]). + +channels_in(Config, Username) -> + channels_in(Config, 0, Username). +channels_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username). + +tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list_of_user, [Username]). + +tracked_user_connection_count(Config, Username) -> + tracked_user_connection_count(Config, 0, Username). +tracked_user_connection_count(Config, NodeIndex, Username) -> + count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username). + +tracked_user_channel_count(Config, Username) -> + tracked_user_channel_count(Config, 0, Username). +tracked_user_channel_count(Config, NodeIndex, Username) -> + count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username). + +count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + count_tracked_items_in, [{user, Username}]). + +exists_in_tracked_connection_per_vhost_table(Config, VHost) -> + exists_in_tracked_connection_per_vhost_table(Config, 0, VHost). +exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) -> + exists_in_tracking_table(Config, NodeIndex, + fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1, + VHost). + +exists_in_tracked_connection_per_user_table(Config, Username) -> + exists_in_tracked_connection_per_user_table(Config, 0, Username). +exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) -> + exists_in_tracking_table(Config, NodeIndex, + fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1, + Username). + +exists_in_tracked_channel_per_user_table(Config, Username) -> + exists_in_tracked_channel_per_user_table(Config, 0, Username). +exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) -> + exists_in_tracking_table(Config, NodeIndex, + fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1, + Username). + +exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + Tab = TableNameFun(Node), + AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + mnesia, + dirty_all_keys, [Tab]), + lists:member(Key, AllKeys). + +mimic_vhost_down(Config, NodeIndex, VHost) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_vhost, vhost_down, [VHost]). + +all_connections(Config) -> + all_connections(Config, 0). +all_connections(Config, NodeIndex) -> + all_tracked_items(Config, NodeIndex, rabbit_connection_tracking). + +all_channels(Config) -> + all_channels(Config, 0). +all_channels(Config, NodeIndex) -> + all_tracked_items(Config, NodeIndex, rabbit_channel_tracking). + +all_tracked_items(Config, NodeIndex, TrackingMod) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list, []). + +set_up_vhost(Config, VHost) -> + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost), + set_vhost_connection_limit(Config, VHost, -1). + +set_vhost_connection_limit(Config, VHost, Count) -> + set_vhost_connection_limit(Config, 0, VHost, Count). + +set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + set_vhost_limits, Node, + ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], + [{"-p", binary_to_list(VHost)}]). + +set_tracking_execution_timeout(Config, Timeout) -> + set_tracking_execution_timeout(Config, 0, Timeout). +set_tracking_execution_timeout(Config, NodeIndex, Timeout) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + application, set_env, + [rabbit, tracking_execution_timeout, Timeout]). + +get_tracking_execution_timeout(Config) -> + get_tracking_execution_timeout(Config, 0). +get_tracking_execution_timeout(Config, NodeIndex) -> + {ok, Timeout} = rabbit_ct_broker_helpers:rpc( + Config, NodeIndex, + application, get_env, + [rabbit, tracking_execution_timeout]), + Timeout. + +await_running_node_refresh(_Config, _NodeIndex) -> + timer:sleep(250). + +expect_that_client_connection_is_rejected(Config) -> + expect_that_client_connection_is_rejected(Config, 0). + +expect_that_client_connection_is_rejected(Config, NodeIndex) -> + {error, not_allowed} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex). + +expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) -> + {error, not_allowed} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost). |
