summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-08-31 08:37:22 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-08-31 08:37:22 +0300
commit58a3b450ab3cabe29576bfb6504449c94ae1ac62 (patch)
tree8e59bacdd4b2bcafaf12f2e3444e5e7a314d3be5
parent716d293e0dfc727700dfb93be64081c5cbf6dd5a (diff)
parent37641110273bdfe274d378a549cb9343980be8f2 (diff)
downloadrabbitmq-server-git-58a3b450ab3cabe29576bfb6504449c94ae1ac62.tar.gz
Merge branch 'Ayanda-D-rabbitmq-per-user-connection-channel-limits'
-rw-r--r--Makefile4
-rw-r--r--src/internal_user.erl203
-rw-r--r--src/internal_user_v1.erl138
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_auth_backend_internal.erl179
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_channel_tracking.erl289
-rw-r--r--src/rabbit_channel_tracking_handler.erl71
-rw-r--r--src/rabbit_connection_tracking.erl350
-rw-r--r--src/rabbit_connection_tracking_handler.erl22
-rw-r--r--src/rabbit_core_ff.erl25
-rw-r--r--src/rabbit_definitions.erl17
-rw-r--r--src/rabbit_direct.erl74
-rw-r--r--src/rabbit_mirror_queue_misc.erl12
-rw-r--r--src/rabbit_mnesia_rename.erl2
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_node_monitor.erl16
-rw-r--r--src/rabbit_quorum_queue.erl4
-rw-r--r--src/rabbit_reader.erl43
-rw-r--r--src/rabbit_table.erl4
-rw-r--r--src/rabbit_tracking.erl103
-rw-r--r--src/rabbit_upgrade.erl2
-rw-r--r--src/rabbit_vhost_limit.erl3
-rw-r--r--test/per_user_connection_channel_limit_SUITE.erl1625
-rw-r--r--test/per_user_connection_channel_limit_partitions_SUITE.erl174
-rw-r--r--test/per_user_connection_channel_tracking_SUITE.erl840
-rw-r--r--test/per_vhost_connection_limit_SUITE.erl4
-rw-r--r--test/per_vhost_connection_limit_partitions_SUITE.erl21
-rw-r--r--test/unit_access_control_SUITE.erl29
-rw-r--r--test/vhost_SUITE.erl2
31 files changed, 3950 insertions, 321 deletions
diff --git a/Makefile b/Makefile
index 5f9373c9b3..3c38e5f57c 100644
--- a/Makefile
+++ b/Makefile
@@ -115,7 +115,9 @@ define PROJECT_ENV
%% Default max message size is 128 MB
{max_message_size, 134217728},
%% Socket writer will run GC every 1 GB of outgoing data
- {writer_gc_threshold, 1000000000}
+ {writer_gc_threshold, 1000000000},
+ %% interval at which connection/channel tracking executes post operations
+ {tracking_execution_timeout, 15000}
]
endef
diff --git a/src/internal_user.erl b/src/internal_user.erl
new file mode 100644
index 0000000000..35b67741ab
--- /dev/null
+++ b/src/internal_user.erl
@@ -0,0 +1,203 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(internal_user).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([
+ new/0,
+ new/1,
+ record_version_to_use/0,
+ fields/0,
+ fields/1,
+ upgrade/1,
+ upgrade_to/2,
+ pattern_match_all/0,
+ get_username/1,
+ get_password_hash/1,
+ get_tags/1,
+ get_hashing_algorithm/1,
+ get_limits/1,
+ create_user/3,
+ set_password_hash/3,
+ set_tags/2,
+ update_limits/3,
+ clear_limits/1
+]).
+
+-define(record_version, internal_user_v2).
+
+-type(username() :: binary()).
+
+-type(password_hash() :: binary()).
+
+-type internal_user() :: internal_user_v1:internal_user_v1() | internal_user_v2().
+
+-record(internal_user, {
+ username :: username() | '_',
+ password_hash :: password_hash() | '_',
+ tags :: [atom()] | '_',
+ %% password hashing implementation module,
+ %% typically rabbit_password_hashing_* but can
+ %% come from a plugin
+ hashing_algorithm :: atom() | '_',
+ limits = #{} :: map() | '_'}).
+
+-type(internal_user_v2() ::
+ #internal_user{username :: username(),
+ password_hash :: password_hash(),
+ tags :: [atom()],
+ hashing_algorithm :: atom(),
+ limits :: map()}).
+
+-type internal_user_pattern() :: internal_user_v1:internal_user_v1_pattern() |
+ internal_user_v2_pattern().
+
+-type internal_user_v2_pattern() :: #internal_user{
+ username :: username() | '_',
+ password_hash :: '_',
+ tags :: '_',
+ hashing_algorithm :: '_',
+ limits :: '_'
+ }.
+
+-export_type([username/0,
+ password_hash/0,
+ internal_user/0,
+ internal_user_v2/0,
+ internal_user_pattern/0,
+ internal_user_v2_pattern/0]).
+
+-spec new() -> internal_user().
+new() ->
+ case record_version_to_use() of
+ ?record_version ->
+ #internal_user{};
+ _ ->
+ internal_user_v1:new()
+ end.
+
+-spec new(tuple()) -> internal_user().
+new({hashing_algorithm, HashingAlgorithm}) ->
+ case record_version_to_use() of
+ ?record_version ->
+ #internal_user{hashing_algorithm = HashingAlgorithm};
+ _ ->
+ internal_user_v1:new({hashing_algorithm, HashingAlgorithm})
+ end;
+new({tags, Tags}) ->
+ case record_version_to_use() of
+ ?record_version ->
+ #internal_user{tags = Tags};
+ _ ->
+ internal_user_v1:new({tags, Tags})
+ end.
+
+-spec record_version_to_use() -> internal_user_v1 | internal_user_v2.
+record_version_to_use() ->
+ case rabbit_feature_flags:is_enabled(user_limits) of
+ true -> ?record_version;
+ false -> internal_user_v1:record_version_to_use()
+ end.
+
+-spec fields() -> list().
+fields() ->
+ case record_version_to_use() of
+ ?record_version -> fields(?record_version);
+ _ -> internal_user_v1:fields()
+ end.
+
+-spec fields(atom()) -> list().
+fields(?record_version) -> record_info(fields, internal_user);
+fields(Version) -> internal_user_v1:fields(Version).
+
+-spec upgrade(internal_user()) -> internal_user().
+upgrade(#internal_user{} = User) -> User;
+upgrade(OldUser) -> upgrade_to(record_version_to_use(), OldUser).
+
+-spec upgrade_to
+(internal_user_v2, internal_user()) -> internal_user_v2();
+(internal_user_v1, internal_user_v1:internal_user_v1()) -> internal_user_v1:internal_user_v1().
+
+upgrade_to(?record_version, #internal_user{} = User) ->
+ User;
+upgrade_to(?record_version, OldUser) ->
+ Fields = erlang:tuple_to_list(OldUser) ++ [#{}],
+ #internal_user{} = erlang:list_to_tuple(Fields);
+upgrade_to(Version, OldUser) ->
+ internal_user_v1:upgrade_to(Version, OldUser).
+
+-spec pattern_match_all() -> internal_user_pattern().
+pattern_match_all() ->
+ case record_version_to_use() of
+ ?record_version -> #internal_user{_ = '_'};
+ _ -> internal_user_v1:pattern_match_all()
+ end.
+
+-spec get_username(internal_user()) -> username().
+get_username(#internal_user{username = Value}) -> Value;
+get_username(User) -> internal_user_v1:get_username(User).
+
+-spec get_password_hash(internal_user()) -> password_hash().
+get_password_hash(#internal_user{password_hash = Value}) -> Value;
+get_password_hash(User) -> internal_user_v1:get_password_hash(User).
+
+-spec get_tags(internal_user()) -> [atom()].
+get_tags(#internal_user{tags = Value}) -> Value;
+get_tags(User) -> internal_user_v1:get_tags(User).
+
+-spec get_hashing_algorithm(internal_user()) -> atom().
+get_hashing_algorithm(#internal_user{hashing_algorithm = Value}) -> Value;
+get_hashing_algorithm(User) -> internal_user_v1:get_hashing_algorithm(User).
+
+-spec get_limits(internal_user()) -> map().
+get_limits(#internal_user{limits = Value}) -> Value;
+get_limits(User) -> internal_user_v1:get_limits(User).
+
+-spec create_user(username(), password_hash(), atom()) -> internal_user().
+create_user(Username, PasswordHash, HashingMod) ->
+ case record_version_to_use() of
+ ?record_version ->
+ #internal_user{username = Username,
+ password_hash = PasswordHash,
+ tags = [],
+ hashing_algorithm = HashingMod,
+ limits = #{}
+ };
+ _ ->
+ internal_user_v1:create_user(Username, PasswordHash, HashingMod)
+ end.
+
+-spec set_password_hash(internal_user(), password_hash(), atom()) -> internal_user().
+set_password_hash(#internal_user{} = User, PasswordHash, HashingAlgorithm) ->
+ User#internal_user{password_hash = PasswordHash,
+ hashing_algorithm = HashingAlgorithm};
+set_password_hash(User, PasswordHash, HashingAlgorithm) ->
+ internal_user_v1:set_password_hash(User, PasswordHash, HashingAlgorithm).
+
+-spec set_tags(internal_user(), [atom()]) -> internal_user().
+set_tags(#internal_user{} = User, Tags) ->
+ User#internal_user{tags = Tags};
+set_tags(User, Tags) ->
+ internal_user_v1:set_tags(User, Tags).
+
+-spec update_limits
+(add, internal_user(), map()) -> internal_user();
+(remove, internal_user(), term()) -> internal_user().
+update_limits(add, #internal_user{limits = Limits} = User, Term) ->
+ User#internal_user{limits = maps:merge(Limits, Term)};
+update_limits(remove, #internal_user{limits = Limits} = User, LimitType) ->
+ User#internal_user{limits = maps:remove(LimitType, Limits)};
+update_limits(Action, User, Term) ->
+ internal_user_v1:update_limits(Action, User, Term).
+
+-spec clear_limits(internal_user()) -> internal_user().
+clear_limits(#internal_user{} = User) ->
+ User#internal_user{limits = #{}};
+clear_limits(User) ->
+ internal_user_v1:update_limits(User).
diff --git a/src/internal_user_v1.erl b/src/internal_user_v1.erl
new file mode 100644
index 0000000000..0a290e5c53
--- /dev/null
+++ b/src/internal_user_v1.erl
@@ -0,0 +1,138 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(internal_user_v1).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([
+ new/0,
+ new/1,
+ record_version_to_use/0,
+ fields/0,
+ fields/1,
+ upgrade/1,
+ upgrade_to/2,
+ pattern_match_all/0,
+ get_username/1,
+ get_password_hash/1,
+ get_tags/1,
+ get_hashing_algorithm/1,
+ get_limits/1,
+ create_user/3,
+ set_password_hash/3,
+ set_tags/2,
+ update_limits/3,
+ clear_limits/1
+]).
+
+-define(record_version, ?MODULE).
+
+-record(internal_user, {
+ username :: internal_user:username() | '_',
+ password_hash :: internal_user:password_hash() | '_',
+ tags :: [atom()] | '_',
+ %% password hashing implementation module,
+ %% typically rabbit_password_hashing_* but can
+ %% come from a plugin
+ hashing_algorithm :: atom() | '_'}).
+
+-type internal_user() :: internal_user_v1().
+
+-type(internal_user_v1() ::
+ #internal_user{username :: internal_user:username(),
+ password_hash :: internal_user:password_hash(),
+ tags :: [atom()],
+ hashing_algorithm :: atom()}).
+
+-type internal_user_pattern() :: internal_user_v1_pattern().
+
+-type internal_user_v1_pattern() :: #internal_user{
+ username :: internal_user:username() | '_',
+ password_hash :: '_',
+ tags :: '_',
+ hashing_algorithm :: '_'
+ }.
+
+-export_type([internal_user/0,
+ internal_user_v1/0,
+ internal_user_pattern/0,
+ internal_user_v1_pattern/0]).
+
+-spec record_version_to_use() -> internal_user_v1.
+record_version_to_use() ->
+ ?record_version.
+
+-spec new() -> internal_user().
+new() ->
+ #internal_user{}.
+
+-spec new(tuple()) -> internal_user().
+new({hashing_algorithm, HashingAlgorithm}) ->
+ #internal_user{hashing_algorithm = HashingAlgorithm};
+new({tags, Tags}) ->
+ #internal_user{tags = Tags}.
+
+-spec fields() -> list().
+fields() -> fields(?record_version).
+
+-spec fields(atom()) -> list().
+fields(?record_version) -> record_info(fields, internal_user).
+
+-spec upgrade(internal_user()) -> internal_user().
+upgrade(#internal_user{} = User) -> User.
+
+-spec upgrade_to(internal_user_v1, internal_user()) -> internal_user().
+upgrade_to(?record_version, #internal_user{} = User) ->
+ User.
+
+-spec pattern_match_all() -> internal_user_pattern().
+pattern_match_all() -> #internal_user{_ = '_'}.
+
+-spec get_username(internal_user()) -> internal_user:username().
+get_username(#internal_user{username = Value}) -> Value.
+
+-spec get_password_hash(internal_user()) -> internal_user:password_hash().
+get_password_hash(#internal_user{password_hash = Value}) -> Value.
+
+-spec get_tags(internal_user()) -> [atom()].
+get_tags(#internal_user{tags = Value}) -> Value.
+
+-spec get_hashing_algorithm(internal_user()) -> atom().
+get_hashing_algorithm(#internal_user{hashing_algorithm = Value}) -> Value.
+
+-spec get_limits(internal_user()) -> map().
+get_limits(_User) -> #{}.
+
+-spec create_user(internal_user:username(), internal_user:password_hash(),
+ atom()) -> internal_user().
+create_user(Username, PasswordHash, HashingMod) ->
+ #internal_user{username = Username,
+ password_hash = PasswordHash,
+ tags = [],
+ hashing_algorithm = HashingMod
+ }.
+
+-spec set_password_hash(internal_user:internal_user(),
+ internal_user:password_hash(), atom()) -> internal_user().
+set_password_hash(#internal_user{} = User, PasswordHash, HashingAlgorithm) ->
+ User#internal_user{password_hash = PasswordHash,
+ hashing_algorithm = HashingAlgorithm}.
+
+-spec set_tags(internal_user(), [atom()]) -> internal_user().
+set_tags(#internal_user{} = User, Tags) ->
+ User#internal_user{tags = Tags}.
+
+-spec update_limits
+(add, internal_user(), map()) -> internal_user();
+(remove, internal_user(), term()) -> internal_user().
+update_limits(_, User, _) ->
+ User.
+
+-spec clear_limits(internal_user()) -> internal_user().
+clear_limits(User) ->
+ User.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f58ae1db94..f057fb0da3 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -199,6 +199,11 @@
{mfa, {rabbit_connection_tracking, boot, []}},
{enables, routing_ready}]}).
+-rabbit_boot_step({channel_tracking,
+ [{description, "channel tracking infrastructure"},
+ {mfa, {rabbit_channel_tracking, boot, []}},
+ {enables, routing_ready}]}).
+
-rabbit_boot_step({background_gc,
[{description, "background garbage collection"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9818e689de..4b796b9d66 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -529,7 +529,7 @@ rebalance(Type, VhostSpec, QueueSpec) ->
maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) ->
rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'",
[Type, VhostSpec, QueueSpec]),
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
NumRunning = length(Running),
ToRebalance = [Q || Q <- rabbit_amqqueue:list(),
filter_per_type(Type, Q),
@@ -1308,7 +1308,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
rabbit_control_misc:await_emitters_termination(Pids).
collect_info_all(VHostPath, Items) ->
- Nodes = rabbit_mnesia:cluster_nodes(running),
+ Nodes = rabbit_nodes:all_running(),
Ref = make_ref(),
Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ],
rabbit_control_misc:await_emitters_termination(Pids),
@@ -1896,7 +1896,7 @@ node_permits_offline_promotion(Node) ->
case node() of
Node -> not rabbit:is_running(); %% [1]
_ -> All = rabbit_mnesia:cluster_nodes(all),
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
lists:member(Node, All) andalso
not lists:member(Node, Running) %% [2]
end.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 835fd9557b..cb930a1630 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -14,16 +14,19 @@
-export([user_login_authentication/2, user_login_authorization/2,
check_vhost_access/3, check_resource_access/4, check_topic_access/4]).
--export([add_user/3, delete_user/2, lookup_user/1,
+-export([add_user/3, delete_user/2, lookup_user/1, exists/1,
change_password/3, clear_password/2,
hash_password/2, change_password_hash/2, change_password_hash/3,
set_tags/3, set_permissions/6, clear_permissions/3,
set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4,
add_user_sans_validation/3, put_user/2, put_user/3]).
+-export([set_user_limits/3, clear_user_limits/3, is_over_connection_limit/1,
+ is_over_channel_limit/1, get_user_limits/0, get_user_limits/1]).
+
-export([user_info_keys/0, perms_info_keys/0,
user_perms_info_keys/0, vhost_perms_info_keys/0,
- user_vhost_perms_info_keys/0,
+ user_vhost_perms_info_keys/0, all_users/0,
list_users/0, list_users/2, list_permissions/0,
list_user_permissions/1, list_user_permissions/3,
list_topic_permissions/0,
@@ -47,9 +50,9 @@
%% there is no information in the record, we consider it to be legacy
%% (inserted by a version older than 3.6.0) and fall back to MD5, the
%% now obsolete hashing function.
-hashing_module_for_user(#internal_user{
- hashing_algorithm = ModOrUndefined}) ->
- rabbit_password:hashing_mod(ModOrUndefined).
+hashing_module_for_user(User) ->
+ ModOrUndefined = internal_user:get_hashing_algorithm(User),
+ rabbit_password:hashing_mod(ModOrUndefined).
-define(BLANK_PASSWORD_REJECTION_MESSAGE,
"user '~s' attempted to log in with a blank password, which is prohibited by the internal authN backend. "
@@ -75,13 +78,14 @@ user_login_authentication(Username, AuthProps) ->
{password, Cleartext} ->
internal_check_user_login(
Username,
- fun (#internal_user{
- password_hash = <<Salt:4/binary, Hash/binary>>
- } = U) ->
- Hash =:= rabbit_password:salted_hash(
- hashing_module_for_user(U), Salt, Cleartext);
- (#internal_user{}) ->
- false
+ fun(User) ->
+ case internal_user:get_password_hash(User) of
+ <<Salt:4/binary, Hash/binary>> ->
+ Hash =:= rabbit_password:salted_hash(
+ hashing_module_for_user(User), Salt, Cleartext);
+ _ ->
+ false
+ end
end);
false -> exit({unknown_auth_props, Username, AuthProps})
end.
@@ -97,7 +101,8 @@ user_login_authorization(Username, _AuthProps) ->
internal_check_user_login(Username, Fun) ->
Refused = {refused, "user '~s' - invalid credentials", [Username]},
case lookup_user(Username) of
- {ok, User = #internal_user{tags = Tags}} ->
+ {ok, User} ->
+ Tags = internal_user:get_tags(User),
case Fun(User) of
true -> {ok, #auth_user{username = Username,
tags = Tags,
@@ -207,10 +212,8 @@ add_user_sans_validation(Username, Password, ActingUser) ->
%% but we also need to store a hint as part of the record, so we
%% retrieve it here one more time
HashingMod = rabbit_password:hashing_mod(),
- User = #internal_user{username = Username,
- password_hash = hash_password(HashingMod, Password),
- tags = [],
- hashing_algorithm = HashingMod},
+ PasswordHash = hash_password(HashingMod, Password),
+ User = internal_user:create_user(Username, PasswordHash, HashingMod),
try
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -280,12 +283,20 @@ delete_user(Username, ActingUser) ->
-spec lookup_user
(rabbit_types:username()) ->
- rabbit_types:ok(rabbit_types:internal_user()) |
+ rabbit_types:ok(internal_user:internal_user()) |
rabbit_types:error('not_found').
lookup_user(Username) ->
rabbit_misc:dirty_read({rabbit_user, Username}).
+-spec exists(rabbit_types:username()) -> boolean().
+
+exists(Username) ->
+ case lookup_user(Username) of
+ {error, not_found} -> false;
+ _ -> true
+ end.
+
-spec change_password
(rabbit_types:username(), rabbit_types:password(), rabbit_types:username()) -> 'ok'.
@@ -343,9 +354,8 @@ change_password_hash(Username, PasswordHash) ->
change_password_hash(Username, PasswordHash, HashingAlgorithm) ->
update_user(Username, fun(User) ->
- User#internal_user{
- password_hash = PasswordHash,
- hashing_algorithm = HashingAlgorithm }
+ internal_user:set_password_hash(User,
+ PasswordHash, HashingAlgorithm)
end).
-spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'.
@@ -355,7 +365,7 @@ set_tags(Username, Tags, ActingUser) ->
rabbit_log:debug("Asked to set user tags for user '~s' to ~p", [Username, ConvertedTags]),
try
R = update_user(Username, fun(User) ->
- User#internal_user{tags = ConvertedTags}
+ internal_user:set_tags(User, ConvertedTags)
end),
rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]),
rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags},
@@ -657,11 +667,6 @@ put_user(User, Version, ActingUser) ->
T <- string:tokens(binary_to_list(TagsS), ",")]
end,
- UserExists = case rabbit_auth_backend_internal:lookup_user(Username) of
- {error, not_found} -> false;
- _ -> true
- end,
-
%% pre-configured, only applies to newly created users
Permissions = maps:get(permissions, User, undefined),
@@ -674,7 +679,7 @@ put_user(User, Version, ActingUser) ->
rabbit_credential_validation:validate(Username, Password) =:= ok
end,
- case UserExists of
+ case exists(Username) of
true ->
case {HasPassword, HasPasswordHash} of
{true, false} ->
@@ -763,6 +768,57 @@ preconfigure_permissions(Username, Map, ActingUser) when is_map(Map) ->
Map),
ok.
+set_user_limits(Username, Definition, ActingUser) when is_list(Definition); is_binary(Definition) ->
+ case rabbit_feature_flags:is_enabled(user_limits) of
+ true ->
+ case rabbit_json:try_decode(rabbit_data_coercion:to_binary(Definition)) of
+ {ok, Term} ->
+ validate_parameters_and_update_limit(Username, Term, ActingUser);
+ {error, Reason} ->
+ {error_string, rabbit_misc:format(
+ "JSON decoding error. Reason: ~ts", [Reason])}
+ end;
+ false -> {error_string, "cannot set any user limits: the user_limits feature flag is not enabled"}
+ end;
+set_user_limits(Username, Definition, ActingUser) when is_map(Definition) ->
+ case rabbit_feature_flags:is_enabled(user_limits) of
+ true -> validate_parameters_and_update_limit(Username, Definition, ActingUser);
+ false -> {error_string, "cannot set any user limits: the user_limits feature flag is not enabled"}
+ end.
+
+validate_parameters_and_update_limit(Username, Term, ActingUser) ->
+ case flatten_errors(rabbit_parameter_validation:proplist(
+ <<"user-limits">>, user_limit_validation(), Term)) of
+ ok ->
+ update_user(Username, fun(User) ->
+ internal_user:update_limits(add, User, Term)
+ end),
+ notify_limit_set(Username, ActingUser, Term);
+ {errors, [{Reason, Arguments}]} ->
+ {error_string, rabbit_misc:format(Reason, Arguments)}
+ end.
+
+user_limit_validation() ->
+ [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional},
+ {<<"max-channels">>, fun rabbit_parameter_validation:integer/2, optional}].
+
+clear_user_limits(Username, <<"all">>, ActingUser) ->
+ update_user(Username, fun(User) ->
+ internal_user:clear_limits(User)
+ end),
+ notify_limit_clear(Username, ActingUser);
+clear_user_limits(Username, LimitType, ActingUser) ->
+ update_user(Username, fun(User) ->
+ internal_user:update_limits(remove, User, LimitType)
+ end),
+ notify_limit_clear(Username, ActingUser).
+
+flatten_errors(L) ->
+ case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
+ [] -> ok;
+ E -> {errors, E}
+ end.
+
%%----------------------------------------------------------------------------
%% Listing
@@ -794,11 +850,13 @@ user_topic_perms_info_keys() -> [vhost, exchange, write, read].
vhost_topic_perms_info_keys() -> [user, exchange, write, read].
user_vhost_topic_perms_info_keys() -> [exchange, write, read].
+all_users() -> mnesia:dirty_match_object(rabbit_user, internal_user:pattern_match_all()).
+
-spec list_users() -> [rabbit_types:infos()].
list_users() ->
[extract_internal_user_params(U) ||
- U <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})].
+ U <- all_users()].
-spec list_users(reference(), pid()) -> 'ok'.
@@ -806,7 +864,7 @@ list_users(Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
fun(U) -> extract_internal_user_params(U) end,
- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})).
+ all_users()).
-spec list_permissions() -> [rabbit_types:infos()].
@@ -881,8 +939,9 @@ extract_user_permission_params(Keys, #user_permission{
{write, WritePerm},
{read, ReadPerm}]).
-extract_internal_user_params(#internal_user{username = Username, tags = Tags}) ->
- [{user, Username}, {tags, Tags}].
+extract_internal_user_params(User) ->
+ [{user, internal_user:get_username(User)},
+ {tags, internal_user:get_tags(User)}].
match_user_vhost(Username, VHostPath) ->
fun () -> mnesia:match_object(
@@ -959,3 +1018,59 @@ hashing_algorithm(User, Version) ->
end;
Alg -> rabbit_data_coercion:to_atom(Alg, utf8)
end.
+
+is_over_connection_limit(Username) ->
+ Fun = fun() ->
+ rabbit_connection_tracking:count_tracked_items_in({user, Username})
+ end,
+ is_over_limit(Username, <<"max-connections">>, Fun).
+
+is_over_channel_limit(Username) ->
+ Fun = fun() ->
+ rabbit_channel_tracking:count_tracked_items_in({user, Username})
+ end,
+ is_over_limit(Username, <<"max-channels">>, Fun).
+
+is_over_limit(Username, LimitType, Fun) ->
+ case get_user_limit(Username, LimitType) of
+ undefined -> false;
+ {ok, 0} -> {true, 0};
+ {ok, Limit} ->
+ case Fun() >= Limit of
+ false -> false;
+ true -> {true, Limit}
+ end
+ end.
+
+get_user_limit(Username, LimitType) ->
+ case lookup_user(Username) of
+ {ok, User} ->
+ case rabbit_misc:pget(LimitType, internal_user:get_limits(User)) of
+ undefined -> undefined;
+ N when N < 0 -> undefined;
+ N when N >= 0 -> {ok, N}
+ end;
+ _ ->
+ undefined
+ end.
+
+get_user_limits() ->
+ [{internal_user:get_username(U), internal_user:get_limits(U)} ||
+ U <- all_users(),
+ internal_user:get_limits(U) =/= #{}].
+
+get_user_limits(Username) ->
+ case lookup_user(Username) of
+ {ok, User} -> internal_user:get_limits(User);
+ _ -> undefined
+ end.
+
+notify_limit_set(Username, ActingUser, Term) ->
+ rabbit_event:notify(user_limits_set,
+ [{name, <<"limits">>}, {user_who_performed_action, ActingUser},
+ {username, Username} | maps:to_list(Term)]).
+
+notify_limit_clear(Username, ActingUser) ->
+ rabbit_event:notify(user_limits_cleared,
+ [{name, <<"limits">>}, {user_who_performed_action, ActingUser},
+ {username, Username}]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e57dfb686c..b65912710f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -352,7 +352,7 @@ send_drained(Pid, CTagCredit) ->
-spec list() -> [pid()].
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
+ rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
rabbit_channel, list_local, []).
-spec list_local() -> [pid()].
diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl
new file mode 100644
index 0000000000..2dbc4d0cf3
--- /dev/null
+++ b/src/rabbit_channel_tracking.erl
@@ -0,0 +1,289 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_channel_tracking).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_channel_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+-behaviour(rabbit_tracking).
+
+-export([boot/0,
+ update_tracked/1,
+ handle_cast/1,
+ register_tracked/1,
+ unregister_tracked/1,
+ count_tracked_items_in/1,
+ clear_tracking_tables/0,
+ shutdown_tracked_items/2]).
+
+-export([list/0, list_of_user/1, list_on_node/1,
+ tracked_channel_table_name_for/1,
+ tracked_channel_per_user_table_name_for/1,
+ get_all_tracked_channel_table_names_for_node/1,
+ delete_tracked_channel_user_entry/1]).
+
+-include_lib("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+%%
+%% API
+%%
+
+%% Sets up and resets channel tracking tables for this node.
+-spec boot() -> ok.
+
+boot() ->
+ ensure_tracked_channels_table_for_this_node(),
+ rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
+ [tracked_channel_table_name_for(node())]),
+ ensure_per_user_tracked_channels_table_for_node(),
+ rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
+ [tracked_channel_per_user_table_name_for(node())]),
+ clear_tracking_tables(),
+ ok.
+
+-spec update_tracked(term()) -> ok.
+
+update_tracked(Event) ->
+ spawn(?MODULE, handle_cast, [Event]).
+
+%% Asynchronously handle update events
+-spec handle_cast(term()) -> ok.
+
+handle_cast({channel_created, Details}) ->
+ ThisNode = node(),
+ case node(pget(pid, Details)) of
+ ThisNode ->
+ TrackedCh = #tracked_channel{id = TrackedChId} =
+ tracked_channel_from_channel_created_event(Details),
+ try
+ register_tracked(TrackedCh)
+ catch
+ error:{no_exists, _} ->
+ Msg = "Could not register channel ~p for tracking, "
+ "its table is not ready yet or the channel terminated prematurely",
+ rabbit_log_connection:warning(Msg, [TrackedChId]),
+ ok;
+ error:Err ->
+ Msg = "Could not register channel ~p for tracking: ~p",
+ rabbit_log_connection:warning(Msg, [TrackedChId, Err]),
+ ok
+ end;
+ _OtherNode ->
+ %% ignore
+ ok
+ end;
+handle_cast({channel_closed, Details}) ->
+ %% channel has terminated, unregister iff local
+ case get_tracked_channel_by_pid(pget(pid, Details)) of
+ [#tracked_channel{name = Name}] ->
+ unregister_tracked(rabbit_tracking:id(node(), Name));
+ _Other -> ok
+ end;
+handle_cast({connection_closed, ConnDetails}) ->
+ ThisNode= node(),
+ ConnPid = pget(pid, ConnDetails),
+
+ case pget(node, ConnDetails) of
+ ThisNode ->
+ TrackedChs = get_tracked_channels_by_connection_pid(ConnPid),
+ rabbit_log_connection:info(
+ "Closing all channels from connection '~p' "
+ "because it has been closed", [pget(name, ConnDetails)]),
+ shutdown_tracked_items(TrackedChs, undefined),
+ [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) ||
+ #tracked_channel{name = Name} <- TrackedChs],
+ ok;
+ _DifferentNode ->
+ ok
+ end;
+handle_cast({user_deleted, Details}) ->
+ Username = pget(name, Details),
+ %% Schedule user entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_channel_user_entry, [Username]),
+ ok;
+handle_cast({node_deleted, Details}) ->
+ Node = pget(node, Details),
+ rabbit_log_connection:info(
+ "Node '~s' was removed from the cluster, deleting"
+ " its channel tracking tables...", [Node]),
+ delete_tracked_channels_table_for_node(Node),
+ delete_per_user_tracked_channels_table_for_node(Node).
+
+-spec register_tracked(rabbit_types:tracked_channel()) -> ok.
+-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]).
+
+register_tracked(TrackedCh =
+ #tracked_channel{node = Node, name = Name, username = Username}) ->
+ ChId = rabbit_tracking:id(Node, Name),
+ TableName = tracked_channel_table_name_for(Node),
+ PerUserChTableName = tracked_channel_per_user_table_name_for(Node),
+ %% upsert
+ case mnesia:dirty_read(TableName, ChId) of
+ [] ->
+ mnesia:dirty_write(TableName, TrackedCh),
+ mnesia:dirty_update_counter(PerUserChTableName, Username, 1);
+ [#tracked_channel{}] ->
+ ok
+ end,
+ ok.
+
+-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok.
+
+unregister_tracked(ChId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_channel_table_name_for(Node),
+ PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node),
+ case mnesia:dirty_read(TableName, ChId) of
+ [] -> ok;
+ [#tracked_channel{username = Username}] ->
+ mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1),
+ mnesia:dirty_delete(TableName, ChId)
+ end.
+
+-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer().
+
+count_tracked_items_in({user, Username}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_channel_per_user_table_name_for/1,
+ #tracked_channel_per_user.channel_count, Username,
+ "channels in vhost").
+
+-spec clear_tracking_tables() -> ok.
+
+clear_tracking_tables() ->
+ clear_tracked_channel_tables_for_this_node().
+
+-spec shutdown_tracked_items(list(), term()) -> ok.
+
+shutdown_tracked_items(TrackedItems, _Args) ->
+ close_channels(TrackedItems).
+
+%% helper functions
+-spec list() -> [rabbit_types:tracked_channel()].
+
+list() ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_channel_table_name_for(Node),
+ Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
+ end, [], rabbit_nodes:all_running()).
+
+-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
+
+list_of_user(Username) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{username = Username, _ = '_'}).
+
+-spec list_on_node(node()) -> [rabbit_types:tracked_channel()].
+
+list_on_node(Node) ->
+ try mnesia:dirty_match_object(
+ tracked_channel_table_name_for(Node),
+ #tracked_channel{_ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+-spec tracked_channel_table_name_for(node()) -> atom().
+
+tracked_channel_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])).
+
+-spec tracked_channel_per_user_table_name_for(node()) -> atom().
+
+tracked_channel_per_user_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format(
+ "tracked_channel_table_per_user_on_node_~s", [Node])).
+
+%% internal
+ensure_tracked_channels_table_for_this_node() ->
+ ensure_tracked_channels_table_for_node(node()).
+
+ensure_per_user_tracked_channels_table_for_node() ->
+ ensure_per_user_tracked_channels_table_for_node(node()).
+
+%% Create tables
+ensure_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_channel},
+ {attributes, record_info(fields, tracked_channel)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+ensure_per_user_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_per_user_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user},
+ {attributes, record_info(fields, tracked_channel_per_user)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+clear_tracked_channel_tables_for_this_node() ->
+ [rabbit_tracking:clear_tracking_table(T)
+ || T <- get_all_tracked_channel_table_names_for_node(node())].
+
+delete_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel").
+
+delete_per_user_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_per_user_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node,
+ "per-user tracked channels").
+
+get_all_tracked_channel_table_names_for_node(Node) ->
+ [tracked_channel_table_name_for(Node),
+ tracked_channel_per_user_table_name_for(Node)].
+
+get_tracked_channels_by_connection_pid(ConnPid) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{connection = ConnPid, _ = '_'}).
+
+get_tracked_channel_by_pid(ChPid) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{pid = ChPid, _ = '_'}).
+
+delete_tracked_channel_user_entry(Username) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_auth_backend_internal, exists, [Username]},
+ fun tracked_channel_per_user_table_name_for/1,
+ Username).
+
+tracked_channel_from_channel_created_event(ChannelDetails) ->
+ Node = node(ChPid = pget(pid, ChannelDetails)),
+ Name = pget(name, ChannelDetails),
+ #tracked_channel{
+ id = rabbit_tracking:id(Node, Name),
+ name = Name,
+ node = Node,
+ vhost = pget(vhost, ChannelDetails),
+ pid = ChPid,
+ connection = pget(connection, ChannelDetails),
+ username = pget(user, ChannelDetails)}.
+
+close_channels(TrackedChannels = [#tracked_channel{}|_]) ->
+ [rabbit_channel:shutdown(ChPid)
+ || #tracked_channel{pid = ChPid} <- TrackedChannels],
+ ok;
+close_channels(_TrackedChannels = []) -> ok.
diff --git a/src/rabbit_channel_tracking_handler.erl b/src/rabbit_channel_tracking_handler.erl
new file mode 100644
index 0000000000..83e5343a7e
--- /dev/null
+++ b/src/rabbit_channel_tracking_handler.erl
@@ -0,0 +1,71 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_channel_tracking_handler).
+
+%% This module keeps track of channel creation and termination events
+%% on its local node. Similar to the rabbit_connection_tracking_handler,
+%% the primary goal here is to decouple channel tracking from rabbit_reader
+%% and isolate channel tracking to its own process to avoid blocking connection
+%% creation events. Additionaly, creation events are also non-blocking in that
+%% they spawn a short-live process for updating the tracking tables in realtime.
+%%
+%% Events from other nodes are ignored.
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include_lib("rabbit.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "channel tracking event handler"},
+ {mfa, {gen_event, add_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {cleanup, {gen_event, delete_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {requires, [channel_tracking]},
+ {enables, recovery}]}).
+
+%%
+%% API
+%%
+
+init([]) ->
+ {ok, []}.
+
+handle_event(#event{type = channel_created, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({channel_created, Details}),
+ {ok, State};
+handle_event(#event{type = channel_closed, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({channel_closed, Details}),
+ {ok, State};
+handle_event(#event{type = connection_closed, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({connection_closed, Details}),
+ {ok, State};
+handle_event(#event{type = user_deleted, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({user_deleted, Details}),
+ {ok, State};
+%% A node had been deleted from the cluster.
+handle_event(#event{type = node_deleted, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({node_deleted, Details}),
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index 9bfe36b925..33938a90cc 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -7,8 +7,6 @@
-module(rabbit_connection_tracking).
--behaviour(gen_server).
-
%% Abstracts away how tracked connection records are stored
%% and queried.
%%
@@ -17,20 +15,41 @@
%% * rabbit_connection_tracking_handler
%% * rabbit_reader
%% * rabbit_event
+-behaviour(rabbit_tracking).
-export([boot/0,
- ensure_tracked_connections_table_for_node/1,
+ update_tracked/1,
+ handle_cast/1,
+ register_tracked/1,
+ unregister_tracked/1,
+ count_tracked_items_in/1,
+ clear_tracking_tables/0,
+ shutdown_tracked_items/2]).
+
+-export([ensure_tracked_connections_table_for_node/1,
ensure_per_vhost_tracked_connections_table_for_node/1,
+ ensure_per_user_tracked_connections_table_for_node/1,
+
ensure_tracked_connections_table_for_this_node/0,
ensure_per_vhost_tracked_connections_table_for_this_node/0,
- tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1,
- delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
+ ensure_per_user_tracked_connections_table_for_this_node/0,
+
+ tracked_connection_table_name_for/1,
+ tracked_connection_per_vhost_table_name_for/1,
+ tracked_connection_per_user_table_name_for/1,
+ get_all_tracked_connection_table_names_for_node/1,
+
+ delete_tracked_connections_table_for_node/1,
+ delete_per_vhost_tracked_connections_table_for_node/1,
+ delete_per_user_tracked_connections_table_for_node/1,
+ delete_tracked_connection_user_entry/1,
+ delete_tracked_connection_vhost_entry/1,
+
clear_tracked_connection_tables_for_this_node/0,
- register_connection/1, unregister_connection/1,
+
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
- count_connections_in/1,
lookup/1,
count/0]).
@@ -38,36 +57,47 @@
-import(rabbit_misc, [pget/2]).
--export([start_link/0]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
-export([close_connections/3]).
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
%%
-%% GenServer API
+%% API
%%
-init([]) ->
- {ok, nostate}.
+%% Behaviour callbacks
+
+-spec boot() -> ok.
+
+%% Sets up and resets connection tracking tables for this
+%% node.
+boot() ->
+ ensure_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for connection tracking on this node: ~p",
+ [tracked_connection_table_name_for(node())]),
+ ensure_per_vhost_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p",
+ [tracked_connection_per_vhost_table_name_for(node())]),
+ ensure_per_user_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for per-user connection counting on this node: ~p",
+ [tracked_connection_per_user_table_name_for(node())]),
+ clear_tracking_tables(),
+ ok.
-handle_call(_Msg, _From, nostate) ->
- {reply, ok, nostate}.
+-spec update_tracked(term()) -> ok.
+update_tracked(Event) ->
+ spawn(?MODULE, handle_cast, [Event]).
-handle_cast({connection_created, Details}, nostate) ->
+%% Asynchronously handle update events
+-spec handle_cast(term()) -> ok.
+
+handle_cast({connection_created, Details}) ->
ThisNode = node(),
case pget(node, Details) of
ThisNode ->
TConn = tracked_connection_from_connection_created(Details),
ConnId = TConn#tracked_connection.id,
try
- register_connection(TConn)
+ register_tracked(TConn)
catch
error:{no_exists, _} ->
Msg = "Could not register connection ~p for tracking, "
@@ -82,84 +112,114 @@ handle_cast({connection_created, Details}, nostate) ->
_OtherNode ->
%% ignore
ok
- end,
- {noreply, nostate};
-handle_cast({connection_closed, Details}, nostate) ->
+ end;
+handle_cast({connection_closed, Details}) ->
ThisNode = node(),
case pget(node, Details) of
ThisNode ->
%% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
%% {pid,<0.1774.0>},
%% {node, rabbit@hostname}]
- unregister_connection(
- {pget(node, Details),
- pget(name, Details)});
+ unregister_tracked(
+ rabbit_tracking:id(ThisNode, pget(name, Details)));
_OtherNode ->
%% ignore
ok
- end,
- {noreply, nostate};
-handle_cast({vhost_deleted, Details}, nostate) ->
+ end;
+handle_cast({vhost_deleted, Details}) ->
VHost = pget(name, Details),
+ %% Schedule vhost entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_connection_vhost_entry, [VHost]),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- close_connections(rabbit_connection_tracking:list(VHost),
- rabbit_misc:format("vhost '~s' is deleted", [VHost])),
- {noreply, nostate};
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list(VHost),
+ rabbit_misc:format("vhost '~s' is deleted", [VHost]));
%% Note: under normal circumstances this will be called immediately
%% after the vhost_deleted above. Therefore we should be careful about
%% what we log and be more defensive.
-handle_cast({vhost_down, Details}, nostate) ->
+handle_cast({vhost_down, Details}) ->
VHost = pget(name, Details),
Node = pget(node, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
" because the vhost is stopping",
[VHost, Node]),
- close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
- rabbit_misc:format("vhost '~s' is down", [VHost])),
- {noreply, nostate};
-handle_cast({user_deleted, Details}, nostate) ->
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list_on_node(Node, VHost),
+ rabbit_misc:format("vhost '~s' is down", [VHost]));
+handle_cast({user_deleted, Details}) ->
Username = pget(name, Details),
+ %% Schedule user entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_connection_user_entry, [Username]),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
- close_connections(rabbit_connection_tracking:list_of_user(Username),
- rabbit_misc:format("user '~s' is deleted", [Username])),
- {noreply, nostate};
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list_of_user(Username),
+ rabbit_misc:format("user '~s' is deleted", [Username]));
%% A node had been deleted from the cluster.
-handle_cast({node_deleted, Details}, nostate) ->
+handle_cast({node_deleted, Details}) ->
Node = pget(node, Details),
rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]),
- rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node),
- rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node),
- {noreply, nostate};
-handle_cast(_Msg, nostate) ->
- {noreply, nostate}.
+ delete_tracked_connections_table_for_node(Node),
+ delete_per_vhost_tracked_connections_table_for_node(Node),
+ delete_per_user_tracked_connections_table_for_node(Node).
-handle_info(_Info, nostate) ->
- {noreply, nostate}.
+-spec register_tracked(rabbit_types:tracked_connection()) -> ok.
+-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]).
-terminate(_Reason, nostate) ->
+register_tracked(#tracked_connection{username = Username, vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
+ %% upsert
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] ->
+ mnesia:dirty_write(TableName, Conn),
+ mnesia:dirty_update_counter(PerVhostTableName, VHost, 1),
+ mnesia:dirty_update_counter(PerUserConnTableName, Username, 1);
+ [#tracked_connection{}] ->
+ ok
+ end,
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+-spec unregister_tracked(rabbit_types:tracked_connection_id()) -> ok.
-%%
-%% API
-%%
+unregister_tracked(ConnId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] -> ok;
+ [#tracked_connection{vhost = VHost, username = Username}] ->
+ mnesia:dirty_update_counter(PerUserConnTableName, Username, -1),
+ mnesia:dirty_update_counter(PerVhostTableName, VHost, -1),
+ mnesia:dirty_delete(TableName, ConnId)
+ end.
--spec boot() -> ok.
+-spec count_tracked_items_in({atom(), rabbit_types:vhost()}) -> non_neg_integer().
-%% Sets up and resets connection tracking tables for this
-%% node.
-boot() ->
- ensure_tracked_connections_table_for_this_node(),
- rabbit_log:info("Setting up a table for connection tracking on this node: ~p",
- [tracked_connection_table_name_for(node())]),
- ensure_per_vhost_tracked_connections_table_for_this_node(),
- rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p",
- [tracked_connection_per_vhost_table_name_for(node())]),
- clear_tracked_connection_tables_for_this_node(),
- ok.
+count_tracked_items_in({vhost, VirtualHost}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_connection_per_vhost_table_name_for/1,
+ #tracked_connection_per_vhost.connection_count, VirtualHost,
+ "connections in vhost");
+count_tracked_items_in({user, Username}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_connection_per_user_table_name_for/1,
+ #tracked_connection_per_user.connection_count, Username,
+ "connections for user").
+
+-spec clear_tracking_tables() -> ok.
+
+clear_tracking_tables() ->
+ clear_tracked_connection_tables_for_this_node().
+
+-spec shutdown_tracked_items(list(), term()) -> ok.
+
+shutdown_tracked_items(TrackedItems, Message) ->
+ close_connections(TrackedItems, Message).
+%% Extended API
-spec ensure_tracked_connections_table_for_this_node() -> ok.
@@ -173,6 +233,13 @@ ensure_per_vhost_tracked_connections_table_for_this_node() ->
ensure_per_vhost_tracked_connections_table_for_node(node()).
+-spec ensure_per_user_tracked_connections_table_for_this_node() -> ok.
+
+ensure_per_user_tracked_connections_table_for_this_node() ->
+ ensure_per_user_tracked_connections_table_for_node(node()).
+
+
+%% Create tables
-spec ensure_tracked_connections_table_for_node(node()) -> ok.
ensure_tracked_connections_table_for_node(Node) ->
@@ -186,7 +253,6 @@ ensure_tracked_connections_table_for_node(Node) ->
ok
end.
-
-spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok.
ensure_per_vhost_tracked_connections_table_for_node(Node) ->
@@ -200,45 +266,44 @@ ensure_per_vhost_tracked_connections_table_for_node(Node) ->
ok
end.
+-spec ensure_per_user_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_per_user_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_user_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection_per_user},
+ {attributes, record_info(fields, tracked_connection_per_user)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a per-user tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
-spec clear_tracked_connection_tables_for_this_node() -> ok.
clear_tracked_connection_tables_for_this_node() ->
- case mnesia:clear_table(tracked_connection_table_name_for(node())) of
- {atomic, ok} -> ok;
- {aborted, _} -> ok
- end,
- case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of
- {atomic, ok} -> ok;
- {aborted, _} -> ok
- end.
-
+ [rabbit_tracking:clear_tracking_table(T)
+ || T <- get_all_tracked_connection_table_names_for_node(node())].
-spec delete_tracked_connections_table_for_node(node()) -> ok.
delete_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_table_name_for(Node),
- case mnesia:delete_table(TableName) of
- {atomic, ok} -> ok;
- {aborted, {no_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to delete a tracked connection table for node ~p: ~p", [Node, Error]),
- ok
- end.
-
+ rabbit_tracking:delete_tracking_table(TableName, Node, "tracked connection").
-spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok.
delete_per_vhost_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_per_vhost_table_name_for(Node),
- case mnesia:delete_table(TableName) of
- {atomic, ok} -> ok;
- {aborted, {no_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to delete a per-vhost tracked connection table for node ~p: ~p", [Node, Error]),
- ok
- end.
+ rabbit_tracking:delete_tracked_table(TableName, Node,
+ "per-vhost tracked connection").
+-spec delete_per_user_tracked_connections_table_for_node(node()) -> ok.
+
+delete_per_user_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_user_table_name_for(Node),
+ rabbit_tracking:delete_tracked_table(TableName, Node,
+ "per-user tracked connection").
-spec tracked_connection_table_name_for(node()) -> atom().
@@ -250,41 +315,23 @@ tracked_connection_table_name_for(Node) ->
tracked_connection_per_vhost_table_name_for(Node) ->
list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])).
+-spec tracked_connection_per_user_table_name_for(node()) -> atom().
--spec register_connection(rabbit_types:tracked_connection()) -> ok.
--dialyzer([{nowarn_function, [register_connection/1]}, race_conditions]).
+tracked_connection_per_user_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format(
+ "tracked_connection_table_per_user_on_node_~s", [Node])).
-register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
- TableName = tracked_connection_table_name_for(Node),
- PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
- %% upsert
- case mnesia:dirty_read(TableName, ConnId) of
- [] ->
- mnesia:dirty_write(TableName, Conn),
- mnesia:dirty_update_counter(
- PerVhostTableName, VHost, 1);
- [_Row] ->
- ok
- end,
- ok.
+-spec get_all_tracked_connection_table_names_for_node(node()) -> [atom()].
--spec unregister_connection(rabbit_types:connection_name()) -> ok.
-
-unregister_connection(ConnId = {Node, _Name}) when Node =:= node() ->
- TableName = tracked_connection_table_name_for(Node),
- PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
- case mnesia:dirty_read(TableName, ConnId) of
- [] -> ok;
- [Row] ->
- mnesia:dirty_update_counter(
- PerVhostTableName, Row#tracked_connection.vhost, -1),
- mnesia:dirty_delete(TableName, ConnId)
- end.
+get_all_tracked_connection_table_names_for_node(Node) ->
+ [tracked_connection_table_name_for(Node),
+ tracked_connection_per_vhost_table_name_for(Node),
+ tracked_connection_per_user_table_name_for(Node)].
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
lookup(Name) ->
- Nodes = rabbit_mnesia:cluster_nodes(running),
+ Nodes = rabbit_nodes:all_running(),
lookup(Name, Nodes).
lookup(_, []) ->
@@ -303,7 +350,7 @@ list() ->
fun (Node, Acc) ->
Tab = tracked_connection_table_name_for(Node),
Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
- end, [], rabbit_mnesia:cluster_nodes(running)).
+ end, [], rabbit_nodes:all_running()).
-spec count() -> non_neg_integer().
@@ -312,17 +359,14 @@ count() ->
fun (Node, Acc) ->
Tab = tracked_connection_table_name_for(Node),
Acc + mnesia:table_info(Tab, size)
- end, 0, rabbit_mnesia:cluster_nodes(running)).
+ end, 0, rabbit_nodes:all_running()).
-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
list(VHost) ->
- lists:foldl(
- fun (Node, Acc) ->
- Tab = tracked_connection_table_name_for(Node),
- Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'})
- end, [], rabbit_mnesia:cluster_nodes(running)).
-
+ rabbit_tracking:match_tracked_items(
+ fun tracked_connection_table_name_for/1,
+ #tracked_connection{vhost = VHost, _ = '_'}).
-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
@@ -346,32 +390,23 @@ list_on_node(Node, VHost) ->
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
list_of_user(Username) ->
- lists:foldl(
- fun (Node, Acc) ->
- Tab = tracked_connection_table_name_for(Node),
- Acc ++ mnesia:dirty_match_object(
- Tab,
- #tracked_connection{username = Username, _ = '_'})
- end, [], rabbit_mnesia:cluster_nodes(running)).
-
--spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
-
-count_connections_in(VirtualHost) ->
- lists:foldl(fun (Node, Acc) ->
- Tab = tracked_connection_per_vhost_table_name_for(Node),
- try
- N = case mnesia:dirty_read(Tab, VirtualHost) of
- [] -> 0;
- [Val] -> Val#tracked_connection_per_vhost.connection_count
- end,
- Acc + N
- catch _:Err ->
- rabbit_log:error(
- "Failed to fetch number of connections in vhost ~p on node ~p:~n~p~n",
- [VirtualHost, Err, Node]),
- Acc
- end
- end, 0, rabbit_mnesia:cluster_nodes(running)).
+ rabbit_tracking:match_tracked_items(
+ fun tracked_connection_table_name_for/1,
+ #tracked_connection{username = Username, _ = '_'}).
+
+%% Internal, delete tracked entries
+
+delete_tracked_connection_vhost_entry(Vhost) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_vhost, exists, [Vhost]},
+ fun tracked_connection_per_vhost_table_name_for/1,
+ Vhost).
+
+delete_tracked_connection_user_entry(Username) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_auth_backend_internal, exists, [Username]},
+ fun tracked_connection_per_user_table_name_for/1,
+ Username).
%% Returns a #tracked_connection from connection_created
%% event details.
@@ -419,7 +454,7 @@ tracked_connection_from_connection_created(EventDetails) ->
%% {connected_at,1453214290847}]
Name = pget(name, EventDetails),
Node = pget(node, EventDetails),
- #tracked_connection{id = {Node, Name},
+ #tracked_connection{id = rabbit_tracking:id(Node, Name),
name = Name,
node = Node,
vhost = pget(vhost, EventDetails),
@@ -476,4 +511,3 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
%% Do an RPC call to the node running the direct client.
Node = node(Pid),
rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).
-
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index c7917e0487..8b9e44445f 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -22,7 +22,6 @@
-export([close_connections/3]).
-include_lib("rabbit.hrl").
--import(rabbit_misc, [pget/2]).
-rabbit_boot_step({?MODULE,
[{description, "connection tracking event handler"},
@@ -30,16 +29,9 @@
[rabbit_event, ?MODULE, []]}},
{cleanup, {gen_event, delete_handler,
[rabbit_event, ?MODULE, []]}},
- {requires, [rabbit_connection_tracking]},
+ {requires, [connection_tracking]},
{enables, recovery}]}).
--rabbit_boot_step({rabbit_connection_tracking,
- [{description, "statistics event manager"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_connection_tracking]}},
- {requires, [rabbit_event, rabbit_node_monitor]},
- {enables, ?MODULE}]}).
-
%%
%% API
%%
@@ -48,26 +40,26 @@ init([]) ->
{ok, []}.
handle_event(#event{type = connection_created, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {connection_created, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({connection_created, Details}),
{ok, State};
handle_event(#event{type = connection_closed, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {connection_closed, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({connection_closed, Details}),
{ok, State};
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {vhost_deleted, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({vhost_deleted, Details}),
{ok, State};
%% Note: under normal circumstances this will be called immediately
%% after the vhost_deleted above. Therefore we should be careful about
%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {vhost_down, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({vhost_down, Details}),
{ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {user_deleted, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({user_deleted, Details}),
{ok, State};
%% A node had been deleted from the cluster.
handle_event(#event{type = node_deleted, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {node_deleted, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({node_deleted, Details}),
{ok, State};
handle_event(_Event, State) ->
{ok, State}.
diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl
index 63aa9b30f1..a251def11c 100644
--- a/src/rabbit_core_ff.erl
+++ b/src/rabbit_core_ff.erl
@@ -10,7 +10,8 @@
-export([quorum_queue_migration/3,
implicit_default_bindings_migration/3,
virtual_host_metadata_migration/3,
- maintenance_mode_status_migration/3]).
+ maintenance_mode_status_migration/3,
+ user_limits_migration/3]).
-rabbit_feature_flag(
{quorum_queue,
@@ -42,6 +43,13 @@
migration_fun => {?MODULE, maintenance_mode_status_migration}
}}).
+-rabbit_feature_flag(
+ {user_limits,
+ #{desc => "Configure connection and channel limits for a user",
+ stability => stable,
+ migration_fun => {?MODULE, user_limits_migration}
+ }}).
+
%% -------------------------------------------------------------------
%% Quorum queues.
%% -------------------------------------------------------------------
@@ -141,3 +149,18 @@ maintenance_mode_status_migration(FeatureName, _FeatureProps, enable) ->
end;
maintenance_mode_status_migration(_FeatureName, _FeatureProps, is_enabled) ->
rabbit_table:exists(rabbit_maintenance:status_table_name()).
+
+%% -------------------------------------------------------------------
+%% User limits.
+%% -------------------------------------------------------------------
+
+user_limits_migration(_FeatureName, _FeatureProps, enable) ->
+ Tab = rabbit_user,
+ rabbit_table:wait([Tab], _Retry = true),
+ Fun = fun(Row) -> internal_user:upgrade_to(internal_user_v2, Row) end,
+ case mnesia:transform_table(Tab, Fun, internal_user:fields(internal_user_v2)) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} -> {error, Reason}
+ end;
+user_limits_migration(_FeatureName, _FeatureProps, is_enabled) ->
+ mnesia:table_info(rabbit_user, attributes) =:= internal_user:fields(internal_user_v2).
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
index f67e6dca48..9920015738 100644
--- a/src/rabbit_definitions.erl
+++ b/src/rabbit_definitions.erl
@@ -700,14 +700,15 @@ vhost_definition(VHost) ->
}.
list_users() ->
- [begin
- {ok, User} = rabbit_auth_backend_internal:lookup_user(pget(user, U)),
- #{<<"name">> => User#internal_user.username,
- <<"password_hash">> => base64:encode(User#internal_user.password_hash),
- <<"hashing_algorithm">> => rabbit_auth_backend_internal:hashing_module_for_user(User),
- <<"tags">> => tags_as_binaries(User#internal_user.tags)
- }
- end || U <- rabbit_auth_backend_internal:list_users()].
+ [user_definition(U) || U <- rabbit_auth_backend_internal:all_users()].
+
+user_definition(User) ->
+ #{<<"name">> => internal_user:get_username(User),
+ <<"password_hash">> => base64:encode(internal_user:get_password_hash(User)),
+ <<"hashing_algorithm">> => rabbit_auth_backend_internal:hashing_module_for_user(User),
+ <<"tags">> => tags_as_binaries(internal_user:get_tags(User)),
+ <<"limits">> => internal_user:get_limits(User)
+ }.
list_runtime_parameters() ->
[runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list(), is_list(P)].
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index e6c251371a..f08a29f829 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -43,7 +43,7 @@ list_local() ->
-spec list() -> [pid()].
list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
+ rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
rabbit_direct, list_local, []).
%%----------------------------------------------------------------------------
@@ -80,7 +80,7 @@ connect(Creds, VHost, Protocol, Pid, Infos) ->
undefined ->
{error, broker_is_booting};
_ ->
- case is_over_connection_limit(VHost, Creds, Pid) of
+ case is_over_vhost_connection_limit(VHost, Creds, Pid) of
true ->
{error, not_allowed};
false ->
@@ -100,7 +100,7 @@ connect(Creds, VHost, Protocol, Pid, Infos) ->
{error, {auth_failure, "Refused"}}
end %% AuthFun()
end %% is_vhost_alive
- end %% is_over_connection_limit
+ end %% is_over_vhost_connection_limit
end;
false -> {error, broker_not_found_on_node}
end.
@@ -146,7 +146,7 @@ is_vhost_alive(VHost, {Username, _Password}, Pid) ->
false
end.
-is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
+is_over_vhost_connection_limit(VHost, {Username, _Password}, Pid) ->
PrintedUsername = case Username of
none -> "";
_ -> Username
@@ -157,7 +157,7 @@ is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
rabbit_log_connection:error(
"Error on Direct connection ~p~n"
"access to vhost '~s' refused for user '~s': "
- "connection limit (~p) is reached",
+ "vhost connection limit (~p) is reached",
[Pid, VHost, PrintedUsername, Limit]),
true
catch
@@ -174,19 +174,30 @@ notify_auth_result(Username, AuthResult, ExtraProps) ->
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
-connect1(User, VHost, Protocol, Pid, Infos) ->
- % Note: peer_host can be either a tuple or
- % a binary if reverse_dns_lookups is enabled
- PeerHost = proplists:get_value(peer_host, Infos),
- AuthzContext = proplists:get_value(variable_map, Infos, #{}),
- try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}, AuthzContext) of
- ok -> ok = pg_local:join(rabbit_direct, Pid),
- rabbit_core_metrics:connection_created(Pid, Infos),
- rabbit_event:notify(connection_created, Infos),
- {ok, {User, rabbit_reader:server_properties(Protocol)}}
- catch
- exit:#amqp_error{name = Reason = not_allowed} ->
- {error, Reason}
+connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
+ case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
+ false ->
+ % Note: peer_host can be either a tuple or
+ % a binary if reverse_dns_lookups is enabled
+ PeerHost = proplists:get_value(peer_host, Infos),
+ AuthzContext = proplists:get_value(variable_map, Infos, #{}),
+ try rabbit_access_control:check_vhost_access(User, VHost,
+ {ip, PeerHost}, AuthzContext) of
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_core_metrics:connection_created(Pid, Infos),
+ rabbit_event:notify(connection_created, Infos),
+ {ok, {User, rabbit_reader:server_properties(Protocol)}}
+ catch
+ exit:#amqp_error{name = Reason = not_allowed} ->
+ {error, Reason}
+ end;
+ {true, Limit} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "access refused for user '~s': "
+ "user connection limit (~p) is reached",
+ [Pid, Username, Limit]),
+ {error, not_allowed}
end.
-spec start_channel
@@ -195,14 +206,25 @@ connect1(User, VHost, Protocol, Pid, Infos) ->
rabbit_framing:amqp_table(), pid(), any()) ->
{'ok', pid()}.
-start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, Collector, AmqpParams) ->
- {ok, _, {ChannelPid, _}} =
- supervisor2:start_child(
- rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector, AmqpParams}]),
- {ok, ChannelPid}.
+start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol,
+ User = #user{username = Username}, VHost, Capabilities,
+ Collector, AmqpParams) ->
+ case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
+ false ->
+ {ok, _, {ChannelPid, _}} =
+ supervisor2:start_child(
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
+ User, VHost, Capabilities, Collector, AmqpParams}]),
+ {ok, ChannelPid};
+ {true, Limit} ->
+ rabbit_log_connection:error(
+ "Error on direct connection ~p~n"
+ "number of channels opened for user '~s' has reached the "
+ "maximum allowed limit of (~w)",
+ [ConnPid, Username, Limit]),
+ {error, not_allowed}
+ end.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 9676ec3716..02f590e2fb 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -152,7 +152,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
slaves_to_start_on_failure(Q, DeadGMPids) ->
%% In case Mnesia has not caught up yet, filter out nodes we know
%% to be dead..
- ClusterNodes = rabbit_mnesia:cluster_nodes(running) --
+ ClusterNodes = rabbit_nodes:all_running() --
[node(P) || P <- DeadGMPids],
{_, OldNodes, _} = actual_queue_nodes(Q),
{_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
@@ -234,16 +234,16 @@ add_mirror(QName, MirrorNode, SyncMode) ->
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
{ok, _} ->
- try
+ try
SPid = rabbit_amqqueue_sup_sup:start_queue_process(
MirrorNode, Q, slave),
log_info(QName, "Adding mirror on node ~p: ~p~n",
[MirrorNode, SPid]),
rabbit_mirror_queue_slave:go(SPid, SyncMode)
- of
+ of
_ -> ok
catch
- error:QError ->
+ error:QError ->
log_warning(QName,
"Unable to start queue mirror on node '~p'. "
"Target queue supervisor is not running: ~p~n",
@@ -320,7 +320,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) ->
%% a long time without being removed.
update_recoverable(SPids, RS) ->
SNodes = [node(SPid) || SPid <- SPids],
- RunningNodes = rabbit_mnesia:cluster_nodes(running),
+ RunningNodes = rabbit_nodes:all_running(),
AddNodes = SNodes -- RS,
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
(RS -- DelNodes) ++ AddNodes.
@@ -384,7 +384,7 @@ suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_runni
suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).
%% The third argument exists so we can pull a call to
-%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction
+%% rabbit_nodes:all_running() out of a loop or transaction
%% or both.
suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) ->
Owner = amqqueue:get_exclusive_owner(Q),
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index 341e639f45..e0d88c0f5e 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -233,7 +233,7 @@ update_term(_NodeMap, Term) ->
rename_in_running_mnesia(FromNode, ToNode) ->
All = rabbit_mnesia:cluster_nodes(all),
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
{false, true} -> ok;
{true, _} -> exit({old_node_running, FromNode});
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 63760cd2e3..bfb1424b13 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -389,7 +389,7 @@ unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
-spec connections() -> [rabbit_types:connection()].
connections() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
+ rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(),
rabbit_networking, connections_local, []).
-spec local_connections() -> [rabbit_types:connection()].
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 3408ea5a51..b56180c54c 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -162,7 +162,7 @@ notify_node_up() ->
-spec notify_joined_cluster() -> 'ok'.
notify_joined_cluster() ->
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ Nodes = rabbit_nodes:all_running() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{joined_cluster, node(), rabbit_mnesia:node_type()}),
ok.
@@ -170,7 +170,7 @@ notify_joined_cluster() ->
-spec notify_left_cluster(node()) -> 'ok'.
notify_left_cluster(Node) ->
- Nodes = rabbit_mnesia:cluster_nodes(running),
+ Nodes = rabbit_nodes:all_running(),
gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}),
ok.
@@ -375,7 +375,7 @@ handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(notify_node_up, State = #state{guid = GUID}) ->
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ Nodes = rabbit_nodes:all_running() -- [node()],
gen_server:abcast(Nodes, ?SERVER,
{node_up, node(), rabbit_mnesia:node_type(), GUID}),
%% register other active rabbits with this rabbit
@@ -425,7 +425,7 @@ handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID},
State = #state{guid = MyGUID,
node_guids = GUIDs}) ->
- case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso
+ case lists:member(Node, rabbit_nodes:all_running()) andalso
maps:find(Node, GUIDs) =:= {ok, NodeGUID} of
true -> spawn_link( %%[1]
fun () ->
@@ -572,7 +572,7 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID,
Node, node(), DownGUID, CheckGUID, MyGUID})
end,
case maps:find(Node, GUIDs) of
- {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running)
+ {ok, DownGUID} -> Alive = rabbit_nodes:all_running()
-- [node(), Node],
[case maps:find(N, GUIDs) of
{ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
@@ -771,7 +771,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
%% going away. It's only safe to forget anything about partitions when
%% there are no partitions.
Down = Partitions -- alive_rabbit_nodes(),
- NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
+ NoLongerPartitioned = rabbit_nodes:all_running(),
Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
[] -> [];
_ -> Partitions
@@ -853,7 +853,7 @@ disconnect(Node) ->
%%--------------------------------------------------------------------
%% mnesia:system_info(db_nodes) (and hence
-%% rabbit_mnesia:cluster_nodes(running)) does not return all nodes
+%% rabbit_nodes:all_running()) does not return all nodes
%% when partitioned, just those that we are sharing Mnesia state
%% with. So we have a small set of replacement functions
%% here. "rabbit" in a function's name implies we test if the rabbit
@@ -917,7 +917,7 @@ ping_all() ->
ok.
possibly_partitioned_nodes() ->
- alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
+ alive_rabbit_nodes() -- rabbit_nodes:all_running().
startup_log([]) ->
rabbit_log:info("Starting rabbit_node_monitor~n", []);
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6eff110ca3..915cf9d527 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -862,7 +862,7 @@ add_member(VHost, Name, Node, Timeout) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
QNodes = get_nodes(Q),
- case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
+ case lists:member(Node, rabbit_nodes:all_running()) of
false ->
{error, node_not_running};
true ->
@@ -993,7 +993,7 @@ shrink_all(Node) ->
[{rabbit_amqqueue:name(),
{ok, pos_integer()} | {error, pos_integer(), term()}}].
grow(Node, VhostSpec, QueueSpec, Strategy) ->
- Running = rabbit_mnesia:cluster_nodes(running),
+ Running = rabbit_nodes:all_running(),
[begin
Size = length(get_nodes(Q)),
QName = amqqueue:get_name(Q),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 79895d95fa..f9697c96e5 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -909,17 +909,27 @@ create_channel(Channel,
#connection{name = Name,
protocol = Protocol,
frame_max = FrameMax,
- user = User,
vhost = VHost,
- capabilities = Capabilities}} = State) ->
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}.
+ capabilities = Capabilities,
+ user = #user{username = Username} = User}
+ } = State) ->
+ case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
+ false ->
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities,
+ Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}};
+ {true, Limit} ->
+ {error, rabbit_misc:amqp_error(not_allowed,
+ "number of channels opened for user '~s' has reached "
+ "the maximum allowed user limit of (~w)",
+ [Username, Limit], 'none')}
+ end.
channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
case get({ch_pid, ChPid}) of
@@ -1218,7 +1228,8 @@ handle_method0(#'connection.open'{virtual_host = VHost},
sock = Sock,
throttle = Throttle}) ->
- ok = is_over_connection_limit(VHost, User),
+ ok = is_over_vhost_connection_limit(VHost, User),
+ ok = is_over_user_connection_limit(User),
ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}),
ok = is_vhost_alive(VHost, User),
NewConnection = Connection#connection{vhost = VHost},
@@ -1317,7 +1328,7 @@ is_vhost_alive(VHostPath, User) ->
[VHostPath, User#user.username, VHostPath])
end.
-is_over_connection_limit(VHostPath, User) ->
+is_over_vhost_connection_limit(VHostPath, User) ->
try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
false -> ok;
{true, Limit} -> rabbit_misc:protocol_error(not_allowed,
@@ -1329,6 +1340,14 @@ is_over_connection_limit(VHostPath, User) ->
rabbit_misc:protocol_error(not_allowed, "vhost ~s not found", [VHostPath])
end.
+is_over_user_connection_limit(#user{username = Username}) ->
+ case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
+ false -> ok;
+ {true, Limit} -> rabbit_misc:protocol_error(not_allowed,
+ "Connection refused for user '~s': "
+ "user connection limit (~p) is reached",
+ [Username, Limit])
+ end.
validate_negotiated_integer_value(Field, Min, ClientValue) ->
ServerValue = get_env(Field),
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 05f9a8d381..7df8844960 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -295,9 +295,9 @@ definitions(ram) ->
definitions() ->
[{rabbit_user,
[{record_name, internal_user},
- {attributes, record_info(fields, internal_user)},
+ {attributes, internal_user:fields()},
{disc_copies, [node()]},
- {match, #internal_user{_='_'}}]},
+ {match, internal_user:pattern_match_all()}]},
{rabbit_user_permission,
[{record_name, user_permission},
{attributes, record_info(fields, user_permission)},
diff --git a/src/rabbit_tracking.erl b/src/rabbit_tracking.erl
new file mode 100644
index 0000000000..a124d20226
--- /dev/null
+++ b/src/rabbit_tracking.erl
@@ -0,0 +1,103 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_tracking).
+
+%% Common behaviour and processing functions for tracking components
+%%
+%% See in use:
+%% * rabbit_connection_tracking
+%% * rabbit_channel_tracking
+
+-callback boot() -> ok.
+-callback update_tracked(term()) -> ok.
+-callback handle_cast(term()) -> ok.
+-callback register_tracked(
+ rabbit_types:tracked_connection() |
+ rabbit_types:tracked_channel()) -> 'ok'.
+-callback unregister_tracked(
+ rabbit_types:tracked_connection_id() |
+ rabbit_types:tracked_channel_id()) -> 'ok'.
+-callback count_tracked_items_in(term()) -> non_neg_integer().
+-callback clear_tracking_tables() -> 'ok'.
+-callback shutdown_tracked_items(list(), term()) -> ok.
+
+-export([id/2, count_tracked_items/4, match_tracked_items/2,
+ clear_tracking_table/1, delete_tracking_table/3,
+ delete_tracked_entry/3]).
+
+%%----------------------------------------------------------------------------
+
+-spec id(atom(), term()) ->
+ rabbit_types:tracked_connection_id() | rabbit_types:tracked_channel_id().
+
+id(Node, Name) -> {Node, Name}.
+
+-spec count_tracked_items(function(), integer(), term(), string()) ->
+ non_neg_integer().
+
+count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) ->
+ lists:foldl(fun (Node, Acc) ->
+ Tab = TableNameFun(Node),
+ try
+ N = case mnesia:dirty_read(Tab, Key) of
+ [] -> 0;
+ [Val] ->
+ element(CountRecPosition, Val)
+ end,
+ Acc + N
+ catch _:Err ->
+ rabbit_log:error(
+ "Failed to fetch number of ~p ~p on node ~p:~n~p~n",
+ [ContextMsg, Key, Node, Err]),
+ Acc
+ end
+ end, 0, rabbit_nodes:all_running()).
+
+-spec match_tracked_items(function(), tuple()) -> term().
+
+match_tracked_items(TableNameFun, MatchSpec) ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = TableNameFun(Node),
+ Acc ++ mnesia:dirty_match_object(
+ Tab,
+ MatchSpec)
+ end, [], rabbit_nodes:all_running()).
+
+-spec clear_tracking_table(atom()) -> ok.
+
+clear_tracking_table(TableName) ->
+ case mnesia:clear_table(TableName) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end.
+
+-spec delete_tracking_table(atom(), node(), string()) -> ok.
+
+delete_tracking_table(TableName, Node, ContextMsg) ->
+ case mnesia:delete_table(TableName) of
+ {atomic, ok} -> ok;
+ {aborted, {no_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to delete a ~p table for node ~p: ~p",
+ [ContextMsg, Node, Error]),
+ ok
+ end.
+
+-spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok.
+
+delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) ->
+ ClusterNodes = rabbit_nodes:all_running(),
+ ExistsInCluster =
+ lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes),
+ case ExistsInCluster of
+ false ->
+ [mnesia:dirty_delete(TableNameFun(Node), Key) || Node <- ClusterNodes];
+ true ->
+ ok
+ end.
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index ba72ecaebc..b1b128fecc 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -145,7 +145,7 @@ run_mnesia_upgrades(Upgrades, AllNodes) ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ AfterUs = rabbit_nodes:all_running() -- [node()],
case {node_type_legacy(), AfterUs} of
{disc, []} ->
primary;
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index ce5532ba54..bee01f3054 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -90,7 +90,8 @@ is_over_connection_limit(VirtualHost) ->
%% with limit = 0, no connections are allowed
{ok, 0} -> {true, 0};
{ok, Limit} when is_integer(Limit) andalso Limit > 0 ->
- ConnectionCount = rabbit_connection_tracking:count_connections_in(VirtualHost),
+ ConnectionCount =
+ rabbit_connection_tracking:count_tracked_items_in({vhost, VirtualHost}),
case ConnectionCount >= Limit of
false -> false;
true -> {true, Limit}
diff --git a/test/per_user_connection_channel_limit_SUITE.erl b/test/per_user_connection_channel_limit_SUITE.erl
new file mode 100644
index 0000000000..35745d65f8
--- /dev/null
+++ b/test/per_user_connection_channel_limit_SUITE.erl
@@ -0,0 +1,1625 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(per_user_connection_channel_limit_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_1_network},
+ {group, cluster_size_2_network},
+ {group, cluster_size_2_direct}
+ ].
+
+groups() ->
+ ClusterSize1Tests = [
+ most_basic_single_node_connection_and_channel_count,
+ single_node_single_user_connection_and_channel_count,
+ single_node_multiple_users_connection_and_channel_count,
+ single_node_list_in_user,
+ single_node_single_user_limit,
+ single_node_single_user_zero_limit,
+ single_node_single_user_clear_limits,
+ single_node_multiple_users_clear_limits,
+ single_node_multiple_users_limit,
+ single_node_multiple_users_zero_limit
+
+ ],
+ ClusterSize2Tests = [
+ most_basic_cluster_connection_and_channel_count,
+ cluster_single_user_connection_and_channel_count,
+ cluster_multiple_users_connection_and_channel_count,
+ cluster_node_restart_connection_and_channel_count,
+ cluster_node_list_on_node,
+ cluster_single_user_limit,
+ cluster_single_user_limit2,
+ cluster_single_user_zero_limit,
+ cluster_single_user_clear_limits,
+ cluster_multiple_users_clear_limits,
+ cluster_multiple_users_zero_limit
+ ],
+ [
+ {cluster_size_1_network, [], ClusterSize1Tests},
+ {cluster_size_2_network, [], ClusterSize2Tests},
+ {cluster_size_2_direct, [], ClusterSize2Tests}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_1_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_1_network, Config1, 1);
+init_per_group(cluster_size_2_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_2_network, Config1, 2);
+init_per_group(cluster_size_2_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_2_direct, Config1, 2);
+
+init_per_group(cluster_rename, Config) ->
+ init_per_multinode_group(cluster_rename, Config, 2).
+
+init_per_multinode_group(Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ case Group of
+ cluster_rename ->
+ % The broker is managed by {init,end}_per_testcase().
+ Config1;
+ _ ->
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps())
+ end.
+
+end_per_group(cluster_rename, Config) ->
+ % The broker is managed by {init,end}_per_testcase().
+ Config;
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ clear_all_connection_tracking_tables(Config),
+ clear_all_channel_tracking_tables(Config),
+ Config.
+
+end_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
+ clear_all_channel_tracking_tables(Config),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+clear_all_connection_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_connection_tracking,
+ clear_tracking_tables,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+clear_all_channel_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_channel_tracking,
+ clear_tracking_tables,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+
+most_basic_single_node_connection_and_channel_count(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn] = open_connections(Config, [0]),
+ [Chan] = open_channels(Conn, 1),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 1
+ end),
+ close_channels([Chan]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+ close_connections([Conn]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0
+ end).
+
+single_node_single_user_connection_and_channel_count(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn1] = open_connections(Config, [0]),
+ [Chan1] = open_channels(Conn1, 1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 1
+ end),
+ close_channels([Chan1]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+ close_connections([Conn1]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn2] = open_connections(Config, [0]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 5
+ end),
+
+ [Conn3] = open_connections(Config, [0]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 2 andalso
+ count_channels_of_user(Config, Username) =:= 10
+ end),
+
+ [Conn4] = open_connections(Config, [0]),
+ _Chans4 = [_|_] = open_channels(Conn4, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 3 andalso
+ count_channels_of_user(Config, Username) =:= 15
+ end),
+
+ close_connections([Conn4]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 2 andalso
+ count_channels_of_user(Config, Username) =:= 10
+ end),
+
+ [Conn5] = open_connections(Config, [0]),
+ Chans5 = [_|_] = open_channels(Conn5, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 3 andalso
+ count_channels_of_user(Config, Username) =:= 15
+ end),
+
+ close_channels(Chans2 ++ Chans3 ++ Chans5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn5]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0
+ end).
+
+single_node_multiple_users_connection_and_channel_count(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username1) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ [Conn1] = open_connections(Config, [{0, Username1}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1 andalso
+ count_channels_of_user(Config, Username1) =:= 5
+ end),
+ close_channels(Chans1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0
+ end),
+ ?assertEqual(0, count_channels_of_user(Config, Username1)),
+ close_connections([Conn1]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username1) =:= 0
+ end),
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 1 andalso
+ count_channels_of_user(Config, Username2) =:= 5
+ end),
+
+ [Conn3] = open_connections(Config, [{0, Username1}]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1 andalso
+ count_channels_of_user(Config, Username1) =:= 5
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 1 andalso
+ count_channels_of_user(Config, Username2) =:= 5
+ end),
+
+ [Conn4] = open_connections(Config, [{0, Username1}]),
+ _Chans4 = [_|_] = open_channels(Conn4, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 2 andalso
+ count_channels_of_user(Config, Username1) =:= 10
+ end),
+
+ close_connections([Conn4]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1 andalso
+ count_channels_of_user(Config, Username1) =:= 5
+ end),
+
+ [Conn5] = open_connections(Config, [{0, Username2}]),
+ Chans5 = [_|_] = open_channels(Conn5, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 2 andalso
+ count_channels_of_user(Config, Username2) =:= 10
+ end),
+
+ [Conn6] = open_connections(Config, [{0, Username2}]),
+ Chans6 = [_|_] = open_channels(Conn6, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 3 andalso
+ count_channels_of_user(Config, Username2) =:= 15
+ end),
+
+ close_channels(Chans2 ++ Chans3 ++ Chans5 ++ Chans6),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn5, Conn6]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username1),
+ rabbit_ct_broker_helpers:delete_user(Config, Username2).
+
+single_node_list_in_user(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ ?assertEqual(0, length(connections_in(Config, Username1))),
+ ?assertEqual(0, length(connections_in(Config, Username2))),
+ ?assertEqual(0, length(channels_in(Config, Username1))),
+ ?assertEqual(0, length(channels_in(Config, Username2))),
+
+ [Conn1] = open_connections(Config, [{0, Username1}]),
+ [Chan1] = open_channels(Conn1, 1),
+ [#tracked_connection{username = Username1}] = connections_in(Config, Username1),
+ [#tracked_channel{username = Username1}] = channels_in(Config, Username1),
+ close_channels([Chan1]),
+ ?assertEqual(0, length(channels_in(Config, Username1))),
+ close_connections([Conn1]),
+ ?assertEqual(0, length(connections_in(Config, Username1))),
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ [Chan2] = open_channels(Conn2, 1),
+ [#tracked_connection{username = Username2}] = connections_in(Config, Username2),
+ [#tracked_channel{username = Username2}] = channels_in(Config, Username2),
+
+ [Conn3] = open_connections(Config, [{0, Username1}]),
+ [Chan3] = open_channels(Conn3, 1),
+ [#tracked_connection{username = Username1}] = connections_in(Config, Username1),
+ [#tracked_channel{username = Username1}] = channels_in(Config, Username1),
+
+ [Conn4] = open_connections(Config, [{0, Username1}]),
+ [_Chan4] = open_channels(Conn4, 1),
+ close_connections([Conn4]),
+ [#tracked_connection{username = Username1}] = connections_in(Config, Username1),
+ [#tracked_channel{username = Username1}] = channels_in(Config, Username1),
+
+ [Conn5, Conn6] = open_connections(Config, [{0, Username2}, {0, Username2}]),
+ [Chan5] = open_channels(Conn5, 1),
+ [Chan6] = open_channels(Conn6, 1),
+ [<<"guest1">>, <<"guest2">>] =
+ lists:usort(lists:map(fun (#tracked_connection{username = V}) -> V end,
+ all_connections(Config))),
+ [<<"guest1">>, <<"guest2">>] =
+ lists:usort(lists:map(fun (#tracked_channel{username = V}) -> V end,
+ all_channels(Config))),
+
+ close_channels([Chan2, Chan3, Chan5, Chan6]),
+ ?assertEqual(0, length(all_channels(Config))),
+
+ close_connections([Conn2, Conn3, Conn5, Conn6]),
+ ?assertEqual(0, length(all_connections(Config))),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username1),
+ rabbit_ct_broker_helpers:delete_user(Config, Username2).
+
+most_basic_cluster_connection_and_channel_count(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 5
+ end),
+ ?assertEqual(1, count_connections_of_user(Config, Username)),
+ ?assertEqual(5, count_channels_of_user(Config, Username)),
+
+ [Conn2] = open_connections(Config, [1]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(2, count_connections_of_user(Config, Username)),
+ ?assertEqual(10, count_channels_of_user(Config, Username)),
+
+ [Conn3] = open_connections(Config, [1]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ ?assertEqual(3, count_connections_of_user(Config, Username)),
+ ?assertEqual(15, count_channels_of_user(Config, Username)),
+
+ close_channels(Chans1 ++ Chans2 ++ Chans3),
+ ?assertEqual(0, count_channels_of_user(Config, Username)),
+
+ close_connections([Conn1, Conn2, Conn3]),
+ ?assertEqual(0, count_connections_of_user(Config, Username)).
+
+cluster_single_user_connection_and_channel_count(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn1] = open_connections(Config, [0]),
+ _Chans1 = [_|_] = open_channels(Conn1, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 5
+ end),
+
+ close_connections([Conn1]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn2] = open_connections(Config, [1]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 5
+ end),
+
+ [Conn3] = open_connections(Config, [0]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 2 andalso
+ count_channels_of_user(Config, Username) =:= 10
+ end),
+
+ [Conn4] = open_connections(Config, [1]),
+ Chans4 = [_|_] = open_channels(Conn4, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 3 andalso
+ count_channels_of_user(Config, Username) =:= 15
+ end),
+
+ close_channels(Chans2 ++ Chans3 ++ Chans4),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn4]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0
+ end).
+
+cluster_multiple_users_connection_and_channel_count(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ [Conn1] = open_connections(Config, [{0, Username1}]),
+ _Chans1 = [_|_] = open_channels(Conn1, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1 andalso
+ count_channels_of_user(Config, Username1) =:= 5
+ end),
+ close_connections([Conn1]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username1) =:= 0
+ end),
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 1 andalso
+ count_channels_of_user(Config, Username2) =:= 5
+ end),
+
+ [Conn3] = open_connections(Config, [{1, Username1}]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1 andalso
+ count_channels_of_user(Config, Username1) =:= 5
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 1 andalso
+ count_channels_of_user(Config, Username2) =:= 5
+ end),
+
+ [Conn4] = open_connections(Config, [{0, Username1}]),
+ _Chans4 = [_|_] = open_channels(Conn4, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 2 andalso
+ count_channels_of_user(Config, Username1) =:= 10
+ end),
+
+ close_connections([Conn4]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1 andalso
+ count_channels_of_user(Config, Username1) =:= 5
+ end),
+
+ [Conn5] = open_connections(Config, [{1, Username2}]),
+ Chans5 = [_|_] = open_channels(Conn5, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 2 andalso
+ count_channels_of_user(Config, Username2) =:= 10
+ end),
+
+ [Conn6] = open_connections(Config, [{0, Username2}]),
+ Chans6 = [_|_] = open_channels(Conn6, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 3 andalso
+ count_channels_of_user(Config, Username2) =:= 15
+ end),
+
+ close_channels(Chans2 ++ Chans3 ++ Chans5 ++ Chans6),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn5, Conn6]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username1),
+ rabbit_ct_broker_helpers:delete_user(Config, Username2).
+
+cluster_node_restart_connection_and_channel_count(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn1] = open_connections(Config, [0]),
+ _Chans1 = [_|_] = open_channels(Conn1, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 5
+ end),
+ close_connections([Conn1]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn2] = open_connections(Config, [1]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 5
+ end),
+
+ [Conn3] = open_connections(Config, [0]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 2 andalso
+ count_channels_of_user(Config, Username) =:= 10
+ end),
+
+ [Conn4] = open_connections(Config, [1]),
+ _Chans4 = [_|_] = open_channels(Conn4, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 3 andalso
+ count_channels_of_user(Config, Username) =:= 15
+ end),
+
+ [Conn5] = open_connections(Config, [1]),
+ Chans5 = [_|_] = open_channels(Conn5, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 4 andalso
+ count_channels_of_user(Config, Username) =:= 20
+ end),
+
+ rabbit_ct_broker_helpers:restart_broker(Config, 1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1 andalso
+ count_channels_of_user(Config, Username) =:= 5
+ end),
+
+ close_channels(Chans2 ++ Chans3 ++ Chans5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn4, Conn5]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0
+ end).
+
+cluster_node_list_on_node(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(all_connections(Config)) =:= 0 andalso
+ length(all_channels(Config)) =:= 0 andalso
+ length(connections_on_node(Config, 0)) =:= 0 andalso
+ length(channels_on_node(Config, 0)) =:= 0
+ end),
+
+ [Conn1] = open_connections(Config, [0]),
+ _Chans1 = [_|_] = open_channels(Conn1, 5),
+ [#tracked_connection{node = A}] = connections_on_node(Config, 0),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length([Ch || Ch <- channels_on_node(Config, 0), Ch#tracked_channel.node =:= A]) =:= 5
+ end),
+ close_connections([Conn1]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(connections_on_node(Config, 0)) =:= 0 andalso
+ length(channels_on_node(Config, 0)) =:= 0
+ end),
+
+ [Conn2] = open_connections(Config, [1]),
+ _Chans2 = [_|_] = open_channels(Conn2, 5),
+ [#tracked_connection{node = B}] = connections_on_node(Config, 1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length([Ch || Ch <- channels_on_node(Config, 1), Ch#tracked_channel.node =:= B]) =:= 5
+ end),
+
+ [Conn3] = open_connections(Config, [0]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(connections_on_node(Config, 0)) =:= 1 andalso
+ length(channels_on_node(Config, 0)) =:= 5
+ end),
+
+ [Conn4] = open_connections(Config, [1]),
+ _Chans4 = [_|_] = open_channels(Conn4, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(connections_on_node(Config, 1)) =:= 2 andalso
+ length(channels_on_node(Config, 1)) =:= 10
+ end),
+
+ close_connections([Conn4]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(connections_on_node(Config, 1)) =:= 1 andalso
+ length(channels_on_node(Config, 1)) =:= 5
+ end),
+
+ [Conn5] = open_connections(Config, [0]),
+ Chans5 = [_|_] = open_channels(Conn5, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(connections_on_node(Config, 0)) =:= 2 andalso
+ length(channels_on_node(Config, 0)) =:= 10
+ end),
+
+ rabbit_ct_broker_helpers:stop_broker(Config, 1),
+ await_running_node_refresh(Config, 0),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(all_connections(Config)) =:= 2 andalso
+ length(all_channels(Config)) =:= 10
+ end),
+
+ close_channels(Chans3 ++ Chans5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(all_channels(Config)) =:= 0
+ end),
+
+ close_connections([Conn3, Conn5]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ length(all_connections(Config)) =:= 0
+ end),
+
+ rabbit_ct_broker_helpers:start_broker(Config, 1).
+
+single_node_single_user_limit(Config) ->
+ single_node_single_user_limit_with(Config, 5, 25),
+ single_node_single_user_limit_with(Config, -1, -1).
+
+single_node_single_user_limit_with(Config, ConnLimit, ChLimit) ->
+ Username = proplists:get_value(rmq_username, Config),
+ set_user_connection_and_channel_limit(Config, Username, 3, 15),
+
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+ ?assertEqual(0, count_channels_of_user(Config, Username)),
+
+ [Conn1, Conn2, Conn3] = Conns1 = open_connections(Config, [0, 0, 0]),
+ [_Chans1, Chans2, Chans3] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_channel_is_rejected(Conn1),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(Conn1) =:= false andalso
+ is_process_alive(Conn2) andalso
+ is_process_alive(Conn3)
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username, ConnLimit, ChLimit),
+ [Conn4, Conn5] = Conns2 = open_connections(Config, [0, 0]),
+ [Chans4, Chans5] = [open_channels(Conn, 5) || Conn <- Conns2],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4, Conn5]),
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1).
+
+single_node_single_user_zero_limit(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ set_user_connection_and_channel_limit(Config, Username, 0, 0),
+
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+ ?assertEqual(0, count_channels_of_user(Config, Username)),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config),
+ expect_that_client_connection_is_rejected(Config),
+ expect_that_client_connection_is_rejected(Config),
+
+ %% with limit = 0 no channels are allowed
+ set_user_connection_and_channel_limit(Config, Username, 1, 0),
+ [ConnA] = open_connections(Config, [0]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1
+ end),
+ expect_that_client_channel_is_rejected(ConnA),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(ConnA) =:= false andalso
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1),
+ [Conn1, Conn2] = Conns1 = open_connections(Config, [0, 0]),
+ [Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1],
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 2 andalso
+ count_channels_of_user(Config, Username) =:= 10
+ end),
+
+ close_channels(Chans1 ++ Chans2),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn1, Conn2]),
+ ?assertEqual(0, count_connections_of_user(Config, Username)).
+
+single_node_single_user_clear_limits(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ set_user_connection_and_channel_limit(Config, Username, 3, 15),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ [Conn1, Conn2, Conn3] = Conns1 = open_connections(Config, [0, 0, 0]),
+ [_Chans1, Chans2, Chans3] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_channel_is_rejected(Conn1),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(Conn1) =:= false andalso
+ is_process_alive(Conn2) andalso
+ is_process_alive(Conn3)
+ end),
+
+ %% reach limit again
+ [Conn4] = open_connections(Config, [{0, Username}]),
+ Chans4 = [_|_] = open_channels(Conn4, 5),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 3 andalso
+ count_channels_of_user(Config, Username) =:= 15
+ end),
+
+ clear_all_user_limits(Config, Username),
+
+ [Conn5, Conn6, Conn7] = Conns2 = open_connections(Config, [0, 0, 0]),
+ [Chans5, Chans6, Chans7] = [open_channels(Conn, 5) || Conn <- Conns2],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6 ++ Chans7),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn4, Conn5, Conn6, Conn7]),
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1).
+
+single_node_multiple_users_clear_limits(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ set_user_connection_and_channel_limit(Config, Username1, 0, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 0, 0),
+
+ ?assertEqual(0, count_connections_of_user(Config, Username1)),
+ ?assertEqual(0, count_connections_of_user(Config, Username2)),
+ ?assertEqual(0, count_channels_of_user(Config, Username1)),
+ ?assertEqual(0, count_channels_of_user(Config, Username2)),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0, Username1),
+ expect_that_client_connection_is_rejected(Config, 0, Username2),
+ expect_that_client_connection_is_rejected(Config, 0, Username1),
+
+ %% with limit = 0 no channels are allowed
+ set_user_connection_and_channel_limit(Config, Username1, 1, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 1, 0),
+ [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {0, Username2}]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1
+ end),
+ expect_that_client_channel_is_rejected(ConnA),
+ expect_that_client_channel_is_rejected(ConnB),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(ConnA) =:= false andalso
+ is_process_alive(ConnB) =:= false
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ clear_all_user_limits(Config, Username1),
+ set_user_channel_limit_only(Config, Username2, -1),
+ set_user_connection_limit_only(Config, Username2, -1),
+
+ [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username1}, {0, Username1}]),
+ [Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ close_channels(Chans1 ++ Chans2),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ close_connections([Conn1, Conn2]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username1, -1, -1),
+ set_user_connection_and_channel_limit(Config, Username2, -1, -1).
+
+single_node_multiple_users_limit(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ set_user_connection_and_channel_limit(Config, Username1, 2, 10),
+ set_user_connection_and_channel_limit(Config, Username2, 2, 10),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [
+ {0, Username1},
+ {0, Username1},
+ {0, Username2},
+ {0, Username2}]),
+
+ [_Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0, Username1),
+ expect_that_client_connection_is_rejected(Config, 0, Username2),
+ expect_that_client_channel_is_rejected(Conn1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(Conn1) =:= false andalso
+ is_process_alive(Conn3) =:= true
+ end),
+
+ [Conn5] = open_connections(Config, [0]),
+ Chans5 = [_|_] = open_channels(Conn5, 5),
+
+ set_user_connection_and_channel_limit(Config, Username1, 5, 25),
+ set_user_connection_and_channel_limit(Config, Username2, -10, -50),
+
+ [Conn6, Conn7, Conn8, Conn9, Conn10] = Conns2 = open_connections(Config, [
+ {0, Username1},
+ {0, Username1},
+ {0, Username1},
+ {0, Username2},
+ {0, Username2}]),
+
+ [Chans6, Chans7, Chans8, Chans9, Chans10] = [open_channels(Conn, 5) || Conn <- Conns2],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6 ++
+ Chans7 ++ Chans8 ++ Chans9 ++ Chans10),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn4, Conn5, Conn6,
+ Conn7, Conn8, Conn9, Conn10]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username1, -1, -1),
+ set_user_connection_and_channel_limit(Config, Username2, -1, -1),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username1),
+ rabbit_ct_broker_helpers:delete_user(Config, Username2).
+
+
+single_node_multiple_users_zero_limit(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ set_user_connection_and_channel_limit(Config, Username1, 0, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 0, 0),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0, Username1),
+ expect_that_client_connection_is_rejected(Config, 0, Username2),
+ expect_that_client_connection_is_rejected(Config, 0, Username1),
+
+ %% with limit = 0 no channels are allowed
+ set_user_connection_and_channel_limit(Config, Username1, 1, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 1, 0),
+ [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {0, Username2}]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1
+ end),
+ expect_that_client_channel_is_rejected(ConnA),
+ expect_that_client_channel_is_rejected(ConnB),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(ConnA) =:= false andalso
+ is_process_alive(ConnB) =:= false
+ end),
+
+ ?assertEqual(false, is_process_alive(ConnA)),
+ ?assertEqual(false, is_process_alive(ConnB)),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username1, -1, -1),
+ [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username1}, {0, Username1}]),
+ [Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ close_channels(Chans1 ++ Chans2),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ close_connections([Conn1, Conn2]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username1, -1, -1),
+ set_user_connection_and_channel_limit(Config, Username2, -1, -1).
+
+
+cluster_single_user_limit(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ set_user_connection_limit_only(Config, Username, 2),
+ set_user_channel_limit_only(Config, Username, 10),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ %% here connections and channels are opened to different nodes
+ [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username}, {1, Username}]),
+ [_Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0, Username),
+ expect_that_client_connection_is_rejected(Config, 1, Username),
+ expect_that_client_channel_is_rejected(Conn1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(Conn1) =:= false andalso
+ is_process_alive(Conn2) =:= true
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username, 5, 25),
+
+ [Conn3, Conn4] = Conns2 = open_connections(Config, [{0, Username}, {0, Username}]),
+ [Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns2],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans4),
+ ?assertEqual(0, count_channels_of_user(Config, Username)),
+
+ close_connections([Conn2, Conn3, Conn4]),
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1).
+
+cluster_single_user_limit2(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ set_user_connection_and_channel_limit(Config, Username, 2, 10),
+
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+ ?assertEqual(0, count_channels_of_user(Config, Username)),
+
+ %% here a limit is reached on one node first
+ [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username}, {0, Username}]),
+ [_Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0, Username),
+ expect_that_client_connection_is_rejected(Config, 1, Username),
+ expect_that_client_channel_is_rejected(Conn1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(Conn1) =:= false andalso
+ is_process_alive(Conn2) =:= true
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username, 5, 25),
+
+ [Conn3, Conn4, Conn5, Conn6, {error, not_allowed}] = open_connections(Config, [
+ {1, Username},
+ {1, Username},
+ {1, Username},
+ {1, Username},
+ {1, Username}]),
+
+ [Chans3, Chans4, Chans5, Chans6, [{error, not_allowed}]] =
+ [open_channels(Conn, 1) || Conn <- [Conn3, Conn4, Conn5, Conn6, Conn1]],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn4, Conn5, Conn6]),
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1).
+
+
+cluster_single_user_zero_limit(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ set_user_connection_and_channel_limit(Config, Username, 0, 0),
+
+ ?assertEqual(0, count_connections_of_user(Config, Username)),
+ ?assertEqual(0, count_channels_of_user(Config, Username)),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0),
+ expect_that_client_connection_is_rejected(Config, 1),
+ expect_that_client_connection_is_rejected(Config, 0),
+
+ %% with limit = 0 no channels are allowed
+ set_user_connection_and_channel_limit(Config, Username, 1, 0),
+ [ConnA] = open_connections(Config, [0]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 1
+ end),
+ expect_that_client_channel_is_rejected(ConnA),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+ ?assertEqual(false, is_process_alive(ConnA)),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1),
+ [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [0, 1, 0, 1]),
+ [Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ close_channels(Chans1 ++ Chans2 ++ Chans3 ++ Chans4),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1).
+
+cluster_single_user_clear_limits(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ set_user_connection_and_channel_limit(Config, Username, 2, 10),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0 andalso
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ %% here a limit is reached on one node first
+ [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username}, {0, Username}]),
+ [_Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ %% we've crossed the limit
+ expect_that_client_connection_is_rejected(Config, 0, Username),
+ expect_that_client_connection_is_rejected(Config, 1, Username),
+ expect_that_client_channel_is_rejected(Conn1),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(Conn1) =:= false andalso
+ is_process_alive(Conn2) =:= true
+ end),
+ clear_all_user_limits(Config, Username),
+
+ [Conn3, Conn4, Conn5, Conn6, Conn7] = open_connections(Config, [
+ {1, Username},
+ {1, Username},
+ {1, Username},
+ {1, Username},
+ {1, Username}]),
+
+ [Chans3, Chans4, Chans5, Chans6, Chans7] =
+ [open_channels(Conn, 1) || Conn <- [Conn3, Conn4, Conn5, Conn6, Conn7]],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6 ++ Chans7),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username) =:= 0
+ end),
+
+ close_connections([Conn2, Conn3, Conn4, Conn5, Conn6, Conn7]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username, -1, -1).
+
+cluster_multiple_users_clear_limits(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ set_user_connection_and_channel_limit(Config, Username1, 0, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 0, 0),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0, Username1),
+ expect_that_client_connection_is_rejected(Config, 0, Username2),
+ expect_that_client_connection_is_rejected(Config, 1, Username1),
+ expect_that_client_connection_is_rejected(Config, 1, Username2),
+
+ %% with limit = 0 no channels are allowed
+ set_user_connection_and_channel_limit(Config, Username1, 1, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 1, 0),
+ [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {1, Username2}]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 1 andalso
+ count_connections_of_user(Config, Username2) =:= 1
+ end),
+ expect_that_client_channel_is_rejected(ConnA),
+
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ is_process_alive(ConnA) =:= false andalso
+ is_process_alive(ConnB) =:= true
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 1
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+ close_connections([ConnB]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+ ?assertEqual(false, is_process_alive(ConnB)),
+
+ clear_all_user_limits(Config, Username1),
+ clear_all_user_limits(Config, Username2),
+
+ [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [
+ {0, Username1},
+ {0, Username2},
+ {1, Username1},
+ {1, Username2}]),
+
+ [Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ close_channels(Chans1 ++ Chans2 ++ Chans3 ++ Chans4),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+
+ set_user_connection_and_channel_limit(Config, Username1, -1, -1),
+ set_user_connection_and_channel_limit(Config, Username2, -1, -1).
+
+cluster_multiple_users_zero_limit(Config) ->
+ Username1 = <<"guest1">>,
+ Username2 = <<"guest2">>,
+
+ set_up_user(Config, Username1),
+ set_up_user(Config, Username2),
+
+ set_user_connection_and_channel_limit(Config, Username1, 0, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 0, 0),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 0
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+
+ %% with limit = 0 no connections are allowed
+ expect_that_client_connection_is_rejected(Config, 0, Username1),
+ expect_that_client_connection_is_rejected(Config, 0, Username2),
+ expect_that_client_connection_is_rejected(Config, 1, Username1),
+ expect_that_client_connection_is_rejected(Config, 1, Username2),
+
+ %% with limit = 0 no channels are allowed
+ set_user_connection_and_channel_limit(Config, Username1, 1, 0),
+ set_user_connection_and_channel_limit(Config, Username2, 1, 0),
+ [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {1, Username2}]),
+
+ expect_that_client_channel_is_rejected(ConnA),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username1) =:= 0 andalso
+ count_connections_of_user(Config, Username2) =:= 1
+ end),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_channels_of_user(Config, Username1) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+ ?assertEqual(false, is_process_alive(ConnA)),
+ ?assertEqual(true, is_process_alive(ConnB)),
+ close_connections([ConnB]),
+ rabbit_ct_helpers:await_condition(
+ fun () ->
+ count_connections_of_user(Config, Username2) =:= 0 andalso
+ count_channels_of_user(Config, Username2) =:= 0
+ end),
+ ?assertEqual(false, is_process_alive(ConnB)),
+
+ set_user_connection_and_channel_limit(Config, Username1, -1, -1),
+ set_user_connection_and_channel_limit(Config, Username2, -1, -1),
+
+ [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [
+ {0, Username1},
+ {0, Username2},
+ {1, Username1},
+ {1, Username2}]),
+
+ [Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1],
+
+ close_channels(Chans1 ++ Chans2 ++ Chans3 ++ Chans4),
+ ?assertEqual(0, count_channels_of_user(Config, Username1)),
+ ?assertEqual(0, count_channels_of_user(Config, Username2)),
+
+ close_connections([Conn1, Conn2, Conn3, Conn4]),
+ ?assertEqual(0, count_connections_of_user(Config, Username1)),
+ ?assertEqual(0, count_connections_of_user(Config, Username2)),
+
+ set_user_connection_and_channel_limit(Config, Username1, -1, -1),
+ set_user_connection_and_channel_limit(Config, Username2, -1, -1).
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+open_connections(Config, NodesAndUsers) ->
+ % Randomly select connection type
+ OpenConnectionFun = case ?config(connection_type, Config) of
+ network -> open_unmanaged_connection;
+ direct -> open_unmanaged_connection_direct
+ end,
+ Conns = lists:map(fun
+ ({Node, User}) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
+ User, User);
+ (Node) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
+ end, NodesAndUsers),
+ timer:sleep(100),
+ Conns.
+
+close_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ rabbit_ct_client_helpers:close_connection(Conn)
+ end, Conns).
+
+open_channels(Conn, N) ->
+ [open_channel(Conn) || _ <- lists:seq(1, N)].
+
+open_channel(Conn) when is_pid(Conn) ->
+ try amqp_connection:open_channel(Conn) of
+ {ok, Ch} -> Ch
+ catch
+ _:_Error -> {error, not_allowed}
+ end.
+
+close_channels(Channels = [_|_]) ->
+ [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels].
+
+count_connections_of_user(Config, Username) ->
+ count_connections_in(Config, Username, 0).
+count_connections_in(Config, Username, NodeIndex) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+ count_channels_of_user(Config, Username) ->
+ count_channels_in(Config, Username, 0).
+ count_channels_in(Config, Username, NodeIndex) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ count_tracked_items_in, [{user, Username}]).
+
+connections_in(Config, Username) ->
+ connections_in(Config, 0, Username).
+connections_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+channels_in(Config, Username) ->
+ channels_in(Config, 0, Username).
+channels_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list_of_user, [Username]).
+
+connections_on_node(Config) ->
+ connections_on_node(Config, 0).
+connections_on_node(Config, NodeIndex) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename),
+ tracked_items_on_node(Config, NodeIndex, rabbit_connection_tracking, Node).
+
+channels_on_node(Config) ->
+ channels_on_node(Config, 0).
+channels_on_node(Config, NodeIndex) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename),
+ tracked_items_on_node(Config, NodeIndex, rabbit_channel_tracking, Node).
+
+tracked_items_on_node(Config, NodeIndex, TrackingMod, NodeForListing) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list_on_node, [NodeForListing]).
+
+all_connections(Config) ->
+ all_connections(Config, 0).
+all_connections(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_connection_tracking).
+
+all_channels(Config) ->
+ all_channels(Config, 0).
+all_channels(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_channel_tracking).
+
+all_tracked_items(Config, NodeIndex, TrackingMod) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list, []).
+
+set_up_user(Config, Username) ->
+ VHost = proplists:get_value(rmq_vhost, Config),
+ rabbit_ct_broker_helpers:add_user(Config, Username),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username, VHost),
+ set_user_connection_and_channel_limit(Config, Username, -1, -1).
+
+set_user_connection_and_channel_limit(Config, Username, ConnLimit, ChLimit) ->
+ set_user_connection_and_channel_limit(Config, 0, Username, ConnLimit, ChLimit).
+
+set_user_connection_and_channel_limit(Config, NodeIndex, Username, ConnLimit, ChLimit) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ set_user_limits, Node, [rabbit_data_coercion:to_list(Username)] ++
+ ["{\"max-connections\": " ++ integer_to_list(ConnLimit) ++ "," ++
+ " \"max-channels\": " ++ integer_to_list(ChLimit) ++ "}"]).
+
+set_user_connection_limit_only(Config, Username, ConnLimit) ->
+ set_user_connection_limit_only(Config, 0, Username, ConnLimit).
+
+set_user_connection_limit_only(Config, NodeIndex, Username, ConnLimit) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ set_user_limits, Node, [rabbit_data_coercion:to_list(Username)] ++
+ ["{\"max-connections\": " ++ integer_to_list(ConnLimit) ++ "}"]).
+
+set_user_channel_limit_only(Config, Username, ChLimit) ->
+ set_user_channel_limit_only(Config, 0, Username, ChLimit).
+
+set_user_channel_limit_only(Config, NodeIndex, Username, ChLimit) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ set_user_limits, Node, [rabbit_data_coercion:to_list(Username)] ++
+ ["{\"max-channels\": " ++ integer_to_list(ChLimit) ++ "}"]).
+
+clear_all_user_limits(Config, Username) ->
+ clear_all_user_limits(Config, 0, Username).
+clear_all_user_limits(Config, NodeIndex, Username) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ clear_user_limits, Node, [rabbit_data_coercion:to_list(Username), "all"]).
+
+await_running_node_refresh(_Config, _NodeIndex) ->
+ timer:sleep(250).
+
+expect_that_client_connection_is_rejected(Config) ->
+ expect_that_client_connection_is_rejected(Config, 0).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex, User) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, User, User).
+
+expect_that_client_channel_is_rejected(Conn) ->
+ {error, not_allowed} = open_channel(Conn).
diff --git a/test/per_user_connection_channel_limit_partitions_SUITE.erl b/test/per_user_connection_channel_limit_partitions_SUITE.erl
new file mode 100644
index 0000000000..32af9ce9a1
--- /dev/null
+++ b/test/per_user_connection_channel_limit_partitions_SUITE.erl
@@ -0,0 +1,174 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(per_user_connection_channel_limit_partitions_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-import(rabbit_ct_client_helpers, [open_unmanaged_connection/2,
+ open_unmanaged_connection/3]).
+
+all() ->
+ [
+ {group, net_ticktime_1}
+ ].
+
+groups() ->
+ [
+ {net_ticktime_1, [], [
+ cluster_full_partition_with_autoheal
+ ]}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% see partitions_SUITE
+-define(DELAY, 12000).
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:configure_dist_proxy/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(net_ticktime_1 = Group, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]),
+ init_per_multinode_group(Group, Config1, 3).
+
+init_per_multinode_group(_Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+
+cluster_full_partition_with_autoheal(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ %% 6 connections, 2 per node
+ Conn1 = open_unmanaged_connection(Config, A),
+ Conn2 = open_unmanaged_connection(Config, A),
+ Conn3 = open_unmanaged_connection(Config, B),
+ Conn4 = open_unmanaged_connection(Config, B),
+ Conn5 = open_unmanaged_connection(Config, C),
+ Conn6 = open_unmanaged_connection(Config, C),
+
+ _Chans1 = [_|_] = open_channels(Conn1, 5),
+ _Chans3 = [_|_] = open_channels(Conn3, 5),
+ _Chans5 = [_|_] = open_channels(Conn5, 5),
+ wait_for_count_connections_in(Config, Username, 6, 60000),
+ ?assertEqual(15, count_channels_in(Config, Username)),
+
+ %% B drops off the network, non-reachable by either A or C
+ rabbit_ct_broker_helpers:block_traffic_between(A, B),
+ rabbit_ct_broker_helpers:block_traffic_between(B, C),
+ timer:sleep(?DELAY),
+
+ %% A and C are still connected, so 4 connections are tracked
+ %% All connections to B are dropped
+ wait_for_count_connections_in(Config, Username, 4, 60000),
+ ?assertEqual(10, count_channels_in(Config, Username)),
+
+ rabbit_ct_broker_helpers:allow_traffic_between(A, B),
+ rabbit_ct_broker_helpers:allow_traffic_between(B, C),
+ timer:sleep(?DELAY),
+
+ %% during autoheal B's connections were dropped
+ wait_for_count_connections_in(Config, Username, 4, 60000),
+ ?assertEqual(10, count_channels_in(Config, Username)),
+
+ lists:foreach(fun (Conn) ->
+ (catch rabbit_ct_client_helpers:close_connection(Conn))
+ end, [Conn1, Conn2, Conn3, Conn4,
+ Conn5, Conn6]),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+
+ passed.
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+wait_for_count_connections_in(Config, Username, Expected, Time) when Time =< 0 ->
+ ?assertMatch(Connections when length(Connections) == Expected,
+ connections_in(Config, Username));
+wait_for_count_connections_in(Config, Username, Expected, Time) ->
+ case connections_in(Config, Username) of
+ Connections when length(Connections) == Expected ->
+ ok;
+ _ ->
+ Sleep = 3000,
+ timer:sleep(Sleep),
+ wait_for_count_connections_in(Config, Username, Expected, Time - Sleep)
+ end.
+
+open_channels(Conn, N) ->
+ [begin
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ Ch
+ end || _ <- lists:seq(1, N)].
+
+count_connections_in(Config, Username) ->
+ length(connections_in(Config, Username)).
+
+connections_in(Config, Username) ->
+ connections_in(Config, 0, Username).
+connections_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+count_channels_in(Config, Username) ->
+ Channels = channels_in(Config, Username),
+ length([Ch || Ch = #tracked_channel{username = Username0} <- Channels,
+ Username =:= Username0]).
+
+channels_in(Config, Username) ->
+ channels_in(Config, 0, Username).
+channels_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list_of_user, [Username]).
diff --git a/test/per_user_connection_channel_tracking_SUITE.erl b/test/per_user_connection_channel_tracking_SUITE.erl
new file mode 100644
index 0000000000..674a2e4177
--- /dev/null
+++ b/test/per_user_connection_channel_tracking_SUITE.erl
@@ -0,0 +1,840 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(per_user_connection_channel_tracking_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_1_network},
+ {group, cluster_size_2_network},
+ {group, cluster_size_1_direct},
+ {group, cluster_size_2_direct}
+ ].
+
+groups() ->
+ ClusterSize1Tests = [
+ single_node_user_connection_channel_tracking,
+ single_node_user_deletion,
+ single_node_vhost_down_mimic,
+ single_node_vhost_deletion
+ ],
+ ClusterSize2Tests = [
+ cluster_user_deletion,
+ cluster_vhost_down_mimic,
+ cluster_vhost_deletion,
+ cluster_node_removed
+ ],
+ [
+ {cluster_size_1_network, [], ClusterSize1Tests},
+ {cluster_size_2_network, [], ClusterSize2Tests},
+ {cluster_size_1_direct, [], ClusterSize1Tests},
+ {cluster_size_2_direct, [], ClusterSize2Tests}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 8}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_1_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_1_network, Config1, 1);
+init_per_group(cluster_size_2_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_2_network, Config1, 2);
+init_per_group(cluster_size_1_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_1_direct, Config1, 1);
+init_per_group(cluster_size_2_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_2_direct, Config1, 2).
+
+init_per_multinode_group(_Group, Config, NodeCount) ->
+ Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, NodeCount},
+ {rmq_nodename_suffix, Suffix}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_group(_Group, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ clear_all_connection_tracking_tables(Config),
+ clear_all_channel_tracking_tables(Config),
+ Config.
+
+end_per_testcase(Testcase, Config) ->
+ clear_all_connection_tracking_tables(Config),
+ clear_all_channel_tracking_tables(Config),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+clear_all_connection_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_connection_tracking,
+ clear_tracking_tables,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+clear_all_channel_tracking_tables(Config) ->
+ [rabbit_ct_broker_helpers:rpc(Config,
+ N,
+ rabbit_channel_tracking,
+ clear_tracking_tables,
+ []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
+
+%% -------------------------------------------------------------------
+%% Test cases.
+%% -------------------------------------------------------------------
+single_node_user_connection_channel_tracking(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ [Chan1] = open_channels(Conn1, 1),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ [#tracked_channel{username = Username}] = channels_in(Config, Username),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ ?assertEqual(true, is_process_alive(Chan1)),
+ close_channels([Chan1]),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Chan1)),
+ close_connections([Conn1]),
+ ?assertEqual(0, length(connections_in(Config, Username))),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ timer:sleep(100),
+ [#tracked_connection{username = Username2}] = connections_in(Config, Username2),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ [Conn3] = open_connections(Config, [0]),
+ Chans3 = [_|_] = open_channels(Conn3, 5),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn3)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3],
+
+ [Conn4] = open_connections(Config, [0]),
+ Chans4 = [_|_] = open_channels(Conn4, 5),
+ ?assertEqual(2, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(10, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn4)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4],
+ kill_connections([Conn4]),
+ [#tracked_connection{username = Username}] = connections_in(Config, Username),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn4)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4],
+
+ [Conn5] = open_connections(Config, [0]),
+ Chans5 = [_|_] = open_channels(Conn5, 7),
+ [Username, Username] =
+ lists:map(fun (#tracked_connection{username = U}) -> U end,
+ connections_in(Config, Username)),
+ ?assertEqual(12, count_channels_in(Config, Username)),
+ ?assertEqual(12, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(2, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn5)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5],
+
+ close_channels(Chans2 ++ Chans3 ++ Chans5),
+ ?assertEqual(0, length(all_channels(Config))),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ close_connections([Conn2, Conn3, Conn5]),
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, length(all_connections(Config))).
+
+single_node_user_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 100),
+
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)),
+ ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, Username2)),
+ ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, Username2)),
+
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ close_channels(Chans1),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+
+single_node_vhost_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 100),
+
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)),
+
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost).
+
+single_node_vhost_down_mimic(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{0, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% mimic vhost down event, while connections exist
+ mimic_vhost_down(Config, 0, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
+
+cluster_user_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 0, 100),
+ set_tracking_execution_timeout(Config, 1, 100),
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [0]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
+ ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
+
+ rabbit_ct_broker_helpers:delete_user(Config, Username2),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% ensure user entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)),
+ ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)),
+
+ close_channels(Chans1),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+
+cluster_vhost_deletion(Config) ->
+ set_tracking_execution_timeout(Config, 0, 100),
+ set_tracking_execution_timeout(Config, 1, 100),
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 0)),
+ ?assertEqual(100, get_tracking_execution_timeout(Config, 1)),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
+ ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
+
+ rabbit_ct_broker_helpers:delete_vhost(Config, Vhost),
+ timer:sleep(200),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1],
+
+ %% ensure vhost entry is cleared after 'tracking_execution_timeout'
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)),
+ ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)),
+
+ rabbit_ct_broker_helpers:add_vhost(Config, Vhost),
+ rabbit_ct_broker_helpers:add_user(Config, Username),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost).
+
+cluster_vhost_down_mimic(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ mimic_vhost_down(Config, 1, Vhost),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ %% gen_event notifies local handlers. remote connections still active
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ mimic_vhost_down(Config, 0, Vhost),
+ timer:sleep(100),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(false, is_process_alive(Conn1)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1].
+
+cluster_node_removed(Config) ->
+ Username = proplists:get_value(rmq_username, Config),
+ Username2 = <<"guest2">>,
+
+ Vhost = proplists:get_value(rmq_vhost, Config),
+
+ rabbit_ct_broker_helpers:add_user(Config, Username2),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost),
+
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, count_connections_in(Config, Username2)),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, count_channels_in(Config, Username2)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username2)),
+
+ [Conn1] = open_connections(Config, [{0, Username}]),
+ Chans1 = [_|_] = open_channels(Conn1, 5),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ [Conn2] = open_connections(Config, [{1, Username2}]),
+ Chans2 = [_|_] = open_channels(Conn2, 5),
+ ?assertEqual(1, count_connections_in(Config, Username2)),
+ ?assertEqual(5, count_channels_in(Config, Username2)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username2)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username2)),
+ ?assertEqual(true, is_process_alive(Conn2)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2],
+
+ rabbit_ct_broker_helpers:stop_broker(Config, 1),
+ timer:sleep(200),
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1),
+ timer:sleep(200),
+ NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+
+ DroppedConnTrackingTables =
+ rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName),
+ [?assertEqual(
+ {'EXIT', {aborted, {no_exists, Tab, all}}},
+ catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables],
+
+ DroppedChTrackingTables =
+ rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName),
+ [?assertEqual(
+ {'EXIT', {aborted, {no_exists, Tab, all}}},
+ catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables],
+
+ ?assertEqual(false, is_process_alive(Conn2)),
+ [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2],
+
+ ?assertEqual(1, count_connections_in(Config, Username)),
+ ?assertEqual(5, count_channels_in(Config, Username)),
+ ?assertEqual(1, tracked_user_connection_count(Config, Username)),
+ ?assertEqual(5, tracked_user_channel_count(Config, Username)),
+ ?assertEqual(true, is_process_alive(Conn1)),
+ [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1],
+
+ close_channels(Chans1),
+ ?assertEqual(0, count_channels_in(Config, Username)),
+ ?assertEqual(0, tracked_user_channel_count(Config, Username)),
+
+ close_connections([Conn1]),
+ ?assertEqual(0, count_connections_in(Config, Username)),
+ ?assertEqual(0, tracked_user_connection_count(Config, Username)).
+
+%% -------------------------------------------------------------------
+%% Helpers
+%% -------------------------------------------------------------------
+
+open_connections(Config, NodesAndUsers) ->
+ % Randomly select connection type
+ OpenConnectionFun = case ?config(connection_type, Config) of
+ network -> open_unmanaged_connection;
+ direct -> open_unmanaged_connection_direct
+ end,
+ Conns = lists:map(fun
+ ({Node, User}) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
+ User, User);
+ (Node) ->
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
+ end, NodesAndUsers),
+ timer:sleep(500),
+ Conns.
+
+close_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ rabbit_ct_client_helpers:close_connection(Conn)
+ end, Conns),
+ timer:sleep(500).
+
+kill_connections(Conns) ->
+ lists:foreach(fun
+ (Conn) ->
+ (catch exit(Conn, please_terminate))
+ end, Conns),
+ timer:sleep(500).
+
+open_channels(Conn, N) ->
+ [begin
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ Ch
+ end || _ <- lists:seq(1, N)].
+
+close_channels(Channels = [_|_]) ->
+ [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels].
+
+count_connections_in(Config, Username) ->
+ length(connections_in(Config, Username)).
+
+connections_in(Config, Username) ->
+ connections_in(Config, 0, Username).
+connections_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+count_channels_in(Config, Username) ->
+ Channels = channels_in(Config, Username),
+ length([Ch || Ch = #tracked_channel{username = Username0} <- Channels,
+ Username =:= Username0]).
+
+channels_in(Config, Username) ->
+ channels_in(Config, 0, Username).
+channels_in(Config, NodeIndex, Username) ->
+ tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list_of_user, [Username]).
+
+tracked_user_connection_count(Config, Username) ->
+ tracked_user_connection_count(Config, 0, Username).
+tracked_user_connection_count(Config, NodeIndex, Username) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username).
+
+tracked_user_channel_count(Config, Username) ->
+ tracked_user_channel_count(Config, 0, Username).
+tracked_user_channel_count(Config, NodeIndex, Username) ->
+ count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username).
+
+count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ count_tracked_items_in, [{user, Username}]).
+
+exists_in_tracked_connection_per_vhost_table(Config, VHost) ->
+ exists_in_tracked_connection_per_vhost_table(Config, 0, VHost).
+exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1,
+ VHost).
+
+exists_in_tracked_connection_per_user_table(Config, Username) ->
+ exists_in_tracked_connection_per_user_table(Config, 0, Username).
+exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1,
+ Username).
+
+exists_in_tracked_channel_per_user_table(Config, Username) ->
+ exists_in_tracked_channel_per_user_table(Config, 0, Username).
+exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) ->
+ exists_in_tracking_table(Config, NodeIndex,
+ fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1,
+ Username).
+
+exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ Tab = TableNameFun(Node),
+ AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ mnesia,
+ dirty_all_keys, [Tab]),
+ lists:member(Key, AllKeys).
+
+mimic_vhost_down(Config, NodeIndex, VHost) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ rabbit_vhost, vhost_down, [VHost]).
+
+all_connections(Config) ->
+ all_connections(Config, 0).
+all_connections(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_connection_tracking).
+
+all_channels(Config) ->
+ all_channels(Config, 0).
+all_channels(Config, NodeIndex) ->
+ all_tracked_items(Config, NodeIndex, rabbit_channel_tracking).
+
+all_tracked_items(Config, NodeIndex, TrackingMod) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ TrackingMod,
+ list, []).
+
+set_up_vhost(Config, VHost) ->
+ rabbit_ct_broker_helpers:add_vhost(Config, VHost),
+ rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost),
+ set_vhost_connection_limit(Config, VHost, -1).
+
+set_vhost_connection_limit(Config, VHost, Count) ->
+ set_vhost_connection_limit(Config, 0, VHost, Count).
+
+set_vhost_connection_limit(Config, NodeIndex, VHost, Count) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(
+ Config, NodeIndex, nodename),
+ ok = rabbit_ct_broker_helpers:control_action(
+ set_vhost_limits, Node,
+ ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"],
+ [{"-p", binary_to_list(VHost)}]).
+
+set_tracking_execution_timeout(Config, Timeout) ->
+ set_tracking_execution_timeout(Config, 0, Timeout).
+set_tracking_execution_timeout(Config, NodeIndex, Timeout) ->
+ rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
+ application, set_env,
+ [rabbit, tracking_execution_timeout, Timeout]).
+
+get_tracking_execution_timeout(Config) ->
+ get_tracking_execution_timeout(Config, 0).
+get_tracking_execution_timeout(Config, NodeIndex) ->
+ {ok, Timeout} = rabbit_ct_broker_helpers:rpc(
+ Config, NodeIndex,
+ application, get_env,
+ [rabbit, tracking_execution_timeout]),
+ Timeout.
+
+await_running_node_refresh(_Config, _NodeIndex) ->
+ timer:sleep(250).
+
+expect_that_client_connection_is_rejected(Config) ->
+ expect_that_client_connection_is_rejected(Config, 0).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex).
+
+expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) ->
+ {error, not_allowed} =
+ rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost).
diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl
index f3e37b0a0c..ef9e5c4554 100644
--- a/test/per_vhost_connection_limit_SUITE.erl
+++ b/test/per_vhost_connection_limit_SUITE.erl
@@ -137,7 +137,7 @@ clear_all_connection_tracking_tables(Config) ->
[rabbit_ct_broker_helpers:rpc(Config,
N,
rabbit_connection_tracking,
- clear_tracked_connection_tables_for_this_node,
+ clear_tracking_tables,
[]) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)].
%% -------------------------------------------------------------------
@@ -692,7 +692,7 @@ count_connections_in(Config, VHost, NodeIndex) ->
timer:sleep(200),
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
- count_connections_in, [VHost]).
+ count_tracked_items_in, [{vhost, VHost}]).
connections_in(Config, VHost) ->
connections_in(Config, 0, VHost).
diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl
index e4c6864ea0..2748d95592 100644
--- a/test/per_vhost_connection_limit_partitions_SUITE.erl
+++ b/test/per_vhost_connection_limit_partitions_SUITE.erl
@@ -140,7 +140,7 @@ count_connections_in(Config, VHost) ->
count_connections_in(Config, VHost, NodeIndex) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
- count_connections_in, [VHost]).
+ count_tracked_items_in, [{vhost, VHost}]).
connections_in(Config, VHost) ->
connections_in(Config, 0, VHost).
@@ -148,22 +148,3 @@ connections_in(Config, NodeIndex, VHost) ->
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
list, [VHost]).
-
-connections_on_node(Config) ->
- connections_on_node(Config, 0).
-connections_on_node(Config, NodeIndex) ->
- Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename),
- rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
- rabbit_connection_tracking,
- list_on_node, [Node]).
-connections_on_node(Config, NodeIndex, NodeForListing) ->
- rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
- rabbit_connection_tracking,
- list_on_node, [NodeForListing]).
-
-all_connections(Config) ->
- all_connections(Config, 0).
-all_connections(Config, NodeIndex) ->
- rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
- rabbit_connection_tracking,
- list, []).
diff --git a/test/unit_access_control_SUITE.erl b/test/unit_access_control_SUITE.erl
index fcfd9e2bde..af8f481083 100644
--- a/test/unit_access_control_SUITE.erl
+++ b/test/unit_access_control_SUITE.erl
@@ -94,23 +94,17 @@ password_hashing1(_Config) ->
rabbit_password_hashing_md5 =
rabbit_auth_backend_internal:hashing_module_for_user(
- #internal_user{}),
+ internal_user:new()),
rabbit_password_hashing_md5 =
rabbit_auth_backend_internal:hashing_module_for_user(
- #internal_user{
- hashing_algorithm = undefined
- }),
+ internal_user:new({hashing_algorithm, undefined})),
rabbit_password_hashing_md5 =
rabbit_auth_backend_internal:hashing_module_for_user(
- #internal_user{
- hashing_algorithm = rabbit_password_hashing_md5
- }),
+ internal_user:new({hashing_algorithm, rabbit_password_hashing_md5})),
rabbit_password_hashing_sha256 =
rabbit_auth_backend_internal:hashing_module_for_user(
- #internal_user{
- hashing_algorithm = rabbit_password_hashing_sha256
- }),
+ internal_user:new({hashing_algorithm, rabbit_password_hashing_sha256})),
passed.
@@ -211,23 +205,20 @@ set_tags_for_passwordless_user1(_Config) ->
ok = rabbit_auth_backend_internal:set_tags(Username, [management],
<<"acting-user">>),
- ?assertMatch(
- {ok, #internal_user{tags = [management]}},
- rabbit_auth_backend_internal:lookup_user(Username)),
+ {ok, User1} = rabbit_auth_backend_internal:lookup_user(Username),
+ ?assertEqual([management], internal_user:get_tags(User1)),
ok = rabbit_auth_backend_internal:set_tags(Username, [management, policymaker],
<<"acting-user">>),
- ?assertMatch(
- {ok, #internal_user{tags = [management, policymaker]}},
- rabbit_auth_backend_internal:lookup_user(Username)),
+ {ok, User2} = rabbit_auth_backend_internal:lookup_user(Username),
+ ?assertEqual([management, policymaker], internal_user:get_tags(User2)),
ok = rabbit_auth_backend_internal:set_tags(Username, [],
<<"acting-user">>),
- ?assertMatch(
- {ok, #internal_user{tags = []}},
- rabbit_auth_backend_internal:lookup_user(Username)),
+ {ok, User3} = rabbit_auth_backend_internal:lookup_user(Username),
+ ?assertEqual([], internal_user:get_tags(User3)),
ok = rabbit_auth_backend_internal:delete_user(Username,
<<"acting-user">>),
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index 5906dfecb1..4e6ffe0d74 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -351,7 +351,7 @@ count_connections_in(Config, VHost, NodeIndex) ->
timer:sleep(200),
rabbit_ct_broker_helpers:rpc(Config, NodeIndex,
rabbit_connection_tracking,
- count_connections_in, [VHost]).
+ count_tracked_items_in, [{vhost, VHost}]).
set_up_vhost(Config, VHost) ->
rabbit_ct_broker_helpers:add_vhost(Config, VHost),