diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-08-31 08:37:22 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-08-31 08:37:22 +0300 |
| commit | 58a3b450ab3cabe29576bfb6504449c94ae1ac62 (patch) | |
| tree | 8e59bacdd4b2bcafaf12f2e3444e5e7a314d3be5 | |
| parent | 716d293e0dfc727700dfb93be64081c5cbf6dd5a (diff) | |
| parent | 37641110273bdfe274d378a549cb9343980be8f2 (diff) | |
| download | rabbitmq-server-git-58a3b450ab3cabe29576bfb6504449c94ae1ac62.tar.gz | |
Merge branch 'Ayanda-D-rabbitmq-per-user-connection-channel-limits'
31 files changed, 3950 insertions, 321 deletions
@@ -115,7 +115,9 @@ define PROJECT_ENV %% Default max message size is 128 MB {max_message_size, 134217728}, %% Socket writer will run GC every 1 GB of outgoing data - {writer_gc_threshold, 1000000000} + {writer_gc_threshold, 1000000000}, + %% interval at which connection/channel tracking executes post operations + {tracking_execution_timeout, 15000} ] endef diff --git a/src/internal_user.erl b/src/internal_user.erl new file mode 100644 index 0000000000..35b67741ab --- /dev/null +++ b/src/internal_user.erl @@ -0,0 +1,203 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(internal_user). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([ + new/0, + new/1, + record_version_to_use/0, + fields/0, + fields/1, + upgrade/1, + upgrade_to/2, + pattern_match_all/0, + get_username/1, + get_password_hash/1, + get_tags/1, + get_hashing_algorithm/1, + get_limits/1, + create_user/3, + set_password_hash/3, + set_tags/2, + update_limits/3, + clear_limits/1 +]). + +-define(record_version, internal_user_v2). + +-type(username() :: binary()). + +-type(password_hash() :: binary()). + +-type internal_user() :: internal_user_v1:internal_user_v1() | internal_user_v2(). + +-record(internal_user, { + username :: username() | '_', + password_hash :: password_hash() | '_', + tags :: [atom()] | '_', + %% password hashing implementation module, + %% typically rabbit_password_hashing_* but can + %% come from a plugin + hashing_algorithm :: atom() | '_', + limits = #{} :: map() | '_'}). + +-type(internal_user_v2() :: + #internal_user{username :: username(), + password_hash :: password_hash(), + tags :: [atom()], + hashing_algorithm :: atom(), + limits :: map()}). + +-type internal_user_pattern() :: internal_user_v1:internal_user_v1_pattern() | + internal_user_v2_pattern(). + +-type internal_user_v2_pattern() :: #internal_user{ + username :: username() | '_', + password_hash :: '_', + tags :: '_', + hashing_algorithm :: '_', + limits :: '_' + }. + +-export_type([username/0, + password_hash/0, + internal_user/0, + internal_user_v2/0, + internal_user_pattern/0, + internal_user_v2_pattern/0]). + +-spec new() -> internal_user(). +new() -> + case record_version_to_use() of + ?record_version -> + #internal_user{}; + _ -> + internal_user_v1:new() + end. + +-spec new(tuple()) -> internal_user(). +new({hashing_algorithm, HashingAlgorithm}) -> + case record_version_to_use() of + ?record_version -> + #internal_user{hashing_algorithm = HashingAlgorithm}; + _ -> + internal_user_v1:new({hashing_algorithm, HashingAlgorithm}) + end; +new({tags, Tags}) -> + case record_version_to_use() of + ?record_version -> + #internal_user{tags = Tags}; + _ -> + internal_user_v1:new({tags, Tags}) + end. + +-spec record_version_to_use() -> internal_user_v1 | internal_user_v2. +record_version_to_use() -> + case rabbit_feature_flags:is_enabled(user_limits) of + true -> ?record_version; + false -> internal_user_v1:record_version_to_use() + end. + +-spec fields() -> list(). +fields() -> + case record_version_to_use() of + ?record_version -> fields(?record_version); + _ -> internal_user_v1:fields() + end. + +-spec fields(atom()) -> list(). +fields(?record_version) -> record_info(fields, internal_user); +fields(Version) -> internal_user_v1:fields(Version). + +-spec upgrade(internal_user()) -> internal_user(). +upgrade(#internal_user{} = User) -> User; +upgrade(OldUser) -> upgrade_to(record_version_to_use(), OldUser). + +-spec upgrade_to +(internal_user_v2, internal_user()) -> internal_user_v2(); +(internal_user_v1, internal_user_v1:internal_user_v1()) -> internal_user_v1:internal_user_v1(). + +upgrade_to(?record_version, #internal_user{} = User) -> + User; +upgrade_to(?record_version, OldUser) -> + Fields = erlang:tuple_to_list(OldUser) ++ [#{}], + #internal_user{} = erlang:list_to_tuple(Fields); +upgrade_to(Version, OldUser) -> + internal_user_v1:upgrade_to(Version, OldUser). + +-spec pattern_match_all() -> internal_user_pattern(). +pattern_match_all() -> + case record_version_to_use() of + ?record_version -> #internal_user{_ = '_'}; + _ -> internal_user_v1:pattern_match_all() + end. + +-spec get_username(internal_user()) -> username(). +get_username(#internal_user{username = Value}) -> Value; +get_username(User) -> internal_user_v1:get_username(User). + +-spec get_password_hash(internal_user()) -> password_hash(). +get_password_hash(#internal_user{password_hash = Value}) -> Value; +get_password_hash(User) -> internal_user_v1:get_password_hash(User). + +-spec get_tags(internal_user()) -> [atom()]. +get_tags(#internal_user{tags = Value}) -> Value; +get_tags(User) -> internal_user_v1:get_tags(User). + +-spec get_hashing_algorithm(internal_user()) -> atom(). +get_hashing_algorithm(#internal_user{hashing_algorithm = Value}) -> Value; +get_hashing_algorithm(User) -> internal_user_v1:get_hashing_algorithm(User). + +-spec get_limits(internal_user()) -> map(). +get_limits(#internal_user{limits = Value}) -> Value; +get_limits(User) -> internal_user_v1:get_limits(User). + +-spec create_user(username(), password_hash(), atom()) -> internal_user(). +create_user(Username, PasswordHash, HashingMod) -> + case record_version_to_use() of + ?record_version -> + #internal_user{username = Username, + password_hash = PasswordHash, + tags = [], + hashing_algorithm = HashingMod, + limits = #{} + }; + _ -> + internal_user_v1:create_user(Username, PasswordHash, HashingMod) + end. + +-spec set_password_hash(internal_user(), password_hash(), atom()) -> internal_user(). +set_password_hash(#internal_user{} = User, PasswordHash, HashingAlgorithm) -> + User#internal_user{password_hash = PasswordHash, + hashing_algorithm = HashingAlgorithm}; +set_password_hash(User, PasswordHash, HashingAlgorithm) -> + internal_user_v1:set_password_hash(User, PasswordHash, HashingAlgorithm). + +-spec set_tags(internal_user(), [atom()]) -> internal_user(). +set_tags(#internal_user{} = User, Tags) -> + User#internal_user{tags = Tags}; +set_tags(User, Tags) -> + internal_user_v1:set_tags(User, Tags). + +-spec update_limits +(add, internal_user(), map()) -> internal_user(); +(remove, internal_user(), term()) -> internal_user(). +update_limits(add, #internal_user{limits = Limits} = User, Term) -> + User#internal_user{limits = maps:merge(Limits, Term)}; +update_limits(remove, #internal_user{limits = Limits} = User, LimitType) -> + User#internal_user{limits = maps:remove(LimitType, Limits)}; +update_limits(Action, User, Term) -> + internal_user_v1:update_limits(Action, User, Term). + +-spec clear_limits(internal_user()) -> internal_user(). +clear_limits(#internal_user{} = User) -> + User#internal_user{limits = #{}}; +clear_limits(User) -> + internal_user_v1:update_limits(User). diff --git a/src/internal_user_v1.erl b/src/internal_user_v1.erl new file mode 100644 index 0000000000..0a290e5c53 --- /dev/null +++ b/src/internal_user_v1.erl @@ -0,0 +1,138 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(internal_user_v1). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([ + new/0, + new/1, + record_version_to_use/0, + fields/0, + fields/1, + upgrade/1, + upgrade_to/2, + pattern_match_all/0, + get_username/1, + get_password_hash/1, + get_tags/1, + get_hashing_algorithm/1, + get_limits/1, + create_user/3, + set_password_hash/3, + set_tags/2, + update_limits/3, + clear_limits/1 +]). + +-define(record_version, ?MODULE). + +-record(internal_user, { + username :: internal_user:username() | '_', + password_hash :: internal_user:password_hash() | '_', + tags :: [atom()] | '_', + %% password hashing implementation module, + %% typically rabbit_password_hashing_* but can + %% come from a plugin + hashing_algorithm :: atom() | '_'}). + +-type internal_user() :: internal_user_v1(). + +-type(internal_user_v1() :: + #internal_user{username :: internal_user:username(), + password_hash :: internal_user:password_hash(), + tags :: [atom()], + hashing_algorithm :: atom()}). + +-type internal_user_pattern() :: internal_user_v1_pattern(). + +-type internal_user_v1_pattern() :: #internal_user{ + username :: internal_user:username() | '_', + password_hash :: '_', + tags :: '_', + hashing_algorithm :: '_' + }. + +-export_type([internal_user/0, + internal_user_v1/0, + internal_user_pattern/0, + internal_user_v1_pattern/0]). + +-spec record_version_to_use() -> internal_user_v1. +record_version_to_use() -> + ?record_version. + +-spec new() -> internal_user(). +new() -> + #internal_user{}. + +-spec new(tuple()) -> internal_user(). +new({hashing_algorithm, HashingAlgorithm}) -> + #internal_user{hashing_algorithm = HashingAlgorithm}; +new({tags, Tags}) -> + #internal_user{tags = Tags}. + +-spec fields() -> list(). +fields() -> fields(?record_version). + +-spec fields(atom()) -> list(). +fields(?record_version) -> record_info(fields, internal_user). + +-spec upgrade(internal_user()) -> internal_user(). +upgrade(#internal_user{} = User) -> User. + +-spec upgrade_to(internal_user_v1, internal_user()) -> internal_user(). +upgrade_to(?record_version, #internal_user{} = User) -> + User. + +-spec pattern_match_all() -> internal_user_pattern(). +pattern_match_all() -> #internal_user{_ = '_'}. + +-spec get_username(internal_user()) -> internal_user:username(). +get_username(#internal_user{username = Value}) -> Value. + +-spec get_password_hash(internal_user()) -> internal_user:password_hash(). +get_password_hash(#internal_user{password_hash = Value}) -> Value. + +-spec get_tags(internal_user()) -> [atom()]. +get_tags(#internal_user{tags = Value}) -> Value. + +-spec get_hashing_algorithm(internal_user()) -> atom(). +get_hashing_algorithm(#internal_user{hashing_algorithm = Value}) -> Value. + +-spec get_limits(internal_user()) -> map(). +get_limits(_User) -> #{}. + +-spec create_user(internal_user:username(), internal_user:password_hash(), + atom()) -> internal_user(). +create_user(Username, PasswordHash, HashingMod) -> + #internal_user{username = Username, + password_hash = PasswordHash, + tags = [], + hashing_algorithm = HashingMod + }. + +-spec set_password_hash(internal_user:internal_user(), + internal_user:password_hash(), atom()) -> internal_user(). +set_password_hash(#internal_user{} = User, PasswordHash, HashingAlgorithm) -> + User#internal_user{password_hash = PasswordHash, + hashing_algorithm = HashingAlgorithm}. + +-spec set_tags(internal_user(), [atom()]) -> internal_user(). +set_tags(#internal_user{} = User, Tags) -> + User#internal_user{tags = Tags}. + +-spec update_limits +(add, internal_user(), map()) -> internal_user(); +(remove, internal_user(), term()) -> internal_user(). +update_limits(_, User, _) -> + User. + +-spec clear_limits(internal_user()) -> internal_user(). +clear_limits(User) -> + User. diff --git a/src/rabbit.erl b/src/rabbit.erl index f58ae1db94..f057fb0da3 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -199,6 +199,11 @@ {mfa, {rabbit_connection_tracking, boot, []}}, {enables, routing_ready}]}). +-rabbit_boot_step({channel_tracking, + [{description, "channel tracking infrastructure"}, + {mfa, {rabbit_channel_tracking, boot, []}}, + {enables, routing_ready}]}). + -rabbit_boot_step({background_gc, [{description, "background garbage collection"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9818e689de..4b796b9d66 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -529,7 +529,7 @@ rebalance(Type, VhostSpec, QueueSpec) -> maybe_rebalance({true, Id}, Type, VhostSpec, QueueSpec) -> rabbit_log:info("Starting queue rebalance operation: '~s' for vhosts matching '~s' and queues matching '~s'", [Type, VhostSpec, QueueSpec]), - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), NumRunning = length(Running), ToRebalance = [Q || Q <- rabbit_amqqueue:list(), filter_per_type(Type, Q), @@ -1308,7 +1308,7 @@ emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) -> rabbit_control_misc:await_emitters_termination(Pids). collect_info_all(VHostPath, Items) -> - Nodes = rabbit_mnesia:cluster_nodes(running), + Nodes = rabbit_nodes:all_running(), Ref = make_ref(), Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, self()]) || Node <- Nodes ], rabbit_control_misc:await_emitters_termination(Pids), @@ -1896,7 +1896,7 @@ node_permits_offline_promotion(Node) -> case node() of Node -> not rabbit:is_running(); %% [1] _ -> All = rabbit_mnesia:cluster_nodes(all), - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), lists:member(Node, All) andalso not lists:member(Node, Running) %% [2] end. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 835fd9557b..cb930a1630 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -14,16 +14,19 @@ -export([user_login_authentication/2, user_login_authorization/2, check_vhost_access/3, check_resource_access/4, check_topic_access/4]). --export([add_user/3, delete_user/2, lookup_user/1, +-export([add_user/3, delete_user/2, lookup_user/1, exists/1, change_password/3, clear_password/2, hash_password/2, change_password_hash/2, change_password_hash/3, set_tags/3, set_permissions/6, clear_permissions/3, set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4, add_user_sans_validation/3, put_user/2, put_user/3]). +-export([set_user_limits/3, clear_user_limits/3, is_over_connection_limit/1, + is_over_channel_limit/1, get_user_limits/0, get_user_limits/1]). + -export([user_info_keys/0, perms_info_keys/0, user_perms_info_keys/0, vhost_perms_info_keys/0, - user_vhost_perms_info_keys/0, + user_vhost_perms_info_keys/0, all_users/0, list_users/0, list_users/2, list_permissions/0, list_user_permissions/1, list_user_permissions/3, list_topic_permissions/0, @@ -47,9 +50,9 @@ %% there is no information in the record, we consider it to be legacy %% (inserted by a version older than 3.6.0) and fall back to MD5, the %% now obsolete hashing function. -hashing_module_for_user(#internal_user{ - hashing_algorithm = ModOrUndefined}) -> - rabbit_password:hashing_mod(ModOrUndefined). +hashing_module_for_user(User) -> + ModOrUndefined = internal_user:get_hashing_algorithm(User), + rabbit_password:hashing_mod(ModOrUndefined). -define(BLANK_PASSWORD_REJECTION_MESSAGE, "user '~s' attempted to log in with a blank password, which is prohibited by the internal authN backend. " @@ -75,13 +78,14 @@ user_login_authentication(Username, AuthProps) -> {password, Cleartext} -> internal_check_user_login( Username, - fun (#internal_user{ - password_hash = <<Salt:4/binary, Hash/binary>> - } = U) -> - Hash =:= rabbit_password:salted_hash( - hashing_module_for_user(U), Salt, Cleartext); - (#internal_user{}) -> - false + fun(User) -> + case internal_user:get_password_hash(User) of + <<Salt:4/binary, Hash/binary>> -> + Hash =:= rabbit_password:salted_hash( + hashing_module_for_user(User), Salt, Cleartext); + _ -> + false + end end); false -> exit({unknown_auth_props, Username, AuthProps}) end. @@ -97,7 +101,8 @@ user_login_authorization(Username, _AuthProps) -> internal_check_user_login(Username, Fun) -> Refused = {refused, "user '~s' - invalid credentials", [Username]}, case lookup_user(Username) of - {ok, User = #internal_user{tags = Tags}} -> + {ok, User} -> + Tags = internal_user:get_tags(User), case Fun(User) of true -> {ok, #auth_user{username = Username, tags = Tags, @@ -207,10 +212,8 @@ add_user_sans_validation(Username, Password, ActingUser) -> %% but we also need to store a hint as part of the record, so we %% retrieve it here one more time HashingMod = rabbit_password:hashing_mod(), - User = #internal_user{username = Username, - password_hash = hash_password(HashingMod, Password), - tags = [], - hashing_algorithm = HashingMod}, + PasswordHash = hash_password(HashingMod, Password), + User = internal_user:create_user(Username, PasswordHash, HashingMod), try R = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -280,12 +283,20 @@ delete_user(Username, ActingUser) -> -spec lookup_user (rabbit_types:username()) -> - rabbit_types:ok(rabbit_types:internal_user()) | + rabbit_types:ok(internal_user:internal_user()) | rabbit_types:error('not_found'). lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). +-spec exists(rabbit_types:username()) -> boolean(). + +exists(Username) -> + case lookup_user(Username) of + {error, not_found} -> false; + _ -> true + end. + -spec change_password (rabbit_types:username(), rabbit_types:password(), rabbit_types:username()) -> 'ok'. @@ -343,9 +354,8 @@ change_password_hash(Username, PasswordHash) -> change_password_hash(Username, PasswordHash, HashingAlgorithm) -> update_user(Username, fun(User) -> - User#internal_user{ - password_hash = PasswordHash, - hashing_algorithm = HashingAlgorithm } + internal_user:set_password_hash(User, + PasswordHash, HashingAlgorithm) end). -spec set_tags(rabbit_types:username(), [atom()], rabbit_types:username()) -> 'ok'. @@ -355,7 +365,7 @@ set_tags(Username, Tags, ActingUser) -> rabbit_log:debug("Asked to set user tags for user '~s' to ~p", [Username, ConvertedTags]), try R = update_user(Username, fun(User) -> - User#internal_user{tags = ConvertedTags} + internal_user:set_tags(User, ConvertedTags) end), rabbit_log:info("Successfully set user tags for user '~s' to ~p", [Username, ConvertedTags]), rabbit_event:notify(user_tags_set, [{name, Username}, {tags, ConvertedTags}, @@ -657,11 +667,6 @@ put_user(User, Version, ActingUser) -> T <- string:tokens(binary_to_list(TagsS), ",")] end, - UserExists = case rabbit_auth_backend_internal:lookup_user(Username) of - {error, not_found} -> false; - _ -> true - end, - %% pre-configured, only applies to newly created users Permissions = maps:get(permissions, User, undefined), @@ -674,7 +679,7 @@ put_user(User, Version, ActingUser) -> rabbit_credential_validation:validate(Username, Password) =:= ok end, - case UserExists of + case exists(Username) of true -> case {HasPassword, HasPasswordHash} of {true, false} -> @@ -763,6 +768,57 @@ preconfigure_permissions(Username, Map, ActingUser) when is_map(Map) -> Map), ok. +set_user_limits(Username, Definition, ActingUser) when is_list(Definition); is_binary(Definition) -> + case rabbit_feature_flags:is_enabled(user_limits) of + true -> + case rabbit_json:try_decode(rabbit_data_coercion:to_binary(Definition)) of + {ok, Term} -> + validate_parameters_and_update_limit(Username, Term, ActingUser); + {error, Reason} -> + {error_string, rabbit_misc:format( + "JSON decoding error. Reason: ~ts", [Reason])} + end; + false -> {error_string, "cannot set any user limits: the user_limits feature flag is not enabled"} + end; +set_user_limits(Username, Definition, ActingUser) when is_map(Definition) -> + case rabbit_feature_flags:is_enabled(user_limits) of + true -> validate_parameters_and_update_limit(Username, Definition, ActingUser); + false -> {error_string, "cannot set any user limits: the user_limits feature flag is not enabled"} + end. + +validate_parameters_and_update_limit(Username, Term, ActingUser) -> + case flatten_errors(rabbit_parameter_validation:proplist( + <<"user-limits">>, user_limit_validation(), Term)) of + ok -> + update_user(Username, fun(User) -> + internal_user:update_limits(add, User, Term) + end), + notify_limit_set(Username, ActingUser, Term); + {errors, [{Reason, Arguments}]} -> + {error_string, rabbit_misc:format(Reason, Arguments)} + end. + +user_limit_validation() -> + [{<<"max-connections">>, fun rabbit_parameter_validation:integer/2, optional}, + {<<"max-channels">>, fun rabbit_parameter_validation:integer/2, optional}]. + +clear_user_limits(Username, <<"all">>, ActingUser) -> + update_user(Username, fun(User) -> + internal_user:clear_limits(User) + end), + notify_limit_clear(Username, ActingUser); +clear_user_limits(Username, LimitType, ActingUser) -> + update_user(Username, fun(User) -> + internal_user:update_limits(remove, User, LimitType) + end), + notify_limit_clear(Username, ActingUser). + +flatten_errors(L) -> + case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of + [] -> ok; + E -> {errors, E} + end. + %%---------------------------------------------------------------------------- %% Listing @@ -794,11 +850,13 @@ user_topic_perms_info_keys() -> [vhost, exchange, write, read]. vhost_topic_perms_info_keys() -> [user, exchange, write, read]. user_vhost_topic_perms_info_keys() -> [exchange, write, read]. +all_users() -> mnesia:dirty_match_object(rabbit_user, internal_user:pattern_match_all()). + -spec list_users() -> [rabbit_types:infos()]. list_users() -> [extract_internal_user_params(U) || - U <- mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})]. + U <- all_users()]. -spec list_users(reference(), pid()) -> 'ok'. @@ -806,7 +864,7 @@ list_users(Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(U) -> extract_internal_user_params(U) end, - mnesia:dirty_match_object(rabbit_user, #internal_user{_ = '_'})). + all_users()). -spec list_permissions() -> [rabbit_types:infos()]. @@ -881,8 +939,9 @@ extract_user_permission_params(Keys, #user_permission{ {write, WritePerm}, {read, ReadPerm}]). -extract_internal_user_params(#internal_user{username = Username, tags = Tags}) -> - [{user, Username}, {tags, Tags}]. +extract_internal_user_params(User) -> + [{user, internal_user:get_username(User)}, + {tags, internal_user:get_tags(User)}]. match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( @@ -959,3 +1018,59 @@ hashing_algorithm(User, Version) -> end; Alg -> rabbit_data_coercion:to_atom(Alg, utf8) end. + +is_over_connection_limit(Username) -> + Fun = fun() -> + rabbit_connection_tracking:count_tracked_items_in({user, Username}) + end, + is_over_limit(Username, <<"max-connections">>, Fun). + +is_over_channel_limit(Username) -> + Fun = fun() -> + rabbit_channel_tracking:count_tracked_items_in({user, Username}) + end, + is_over_limit(Username, <<"max-channels">>, Fun). + +is_over_limit(Username, LimitType, Fun) -> + case get_user_limit(Username, LimitType) of + undefined -> false; + {ok, 0} -> {true, 0}; + {ok, Limit} -> + case Fun() >= Limit of + false -> false; + true -> {true, Limit} + end + end. + +get_user_limit(Username, LimitType) -> + case lookup_user(Username) of + {ok, User} -> + case rabbit_misc:pget(LimitType, internal_user:get_limits(User)) of + undefined -> undefined; + N when N < 0 -> undefined; + N when N >= 0 -> {ok, N} + end; + _ -> + undefined + end. + +get_user_limits() -> + [{internal_user:get_username(U), internal_user:get_limits(U)} || + U <- all_users(), + internal_user:get_limits(U) =/= #{}]. + +get_user_limits(Username) -> + case lookup_user(Username) of + {ok, User} -> internal_user:get_limits(User); + _ -> undefined + end. + +notify_limit_set(Username, ActingUser, Term) -> + rabbit_event:notify(user_limits_set, + [{name, <<"limits">>}, {user_who_performed_action, ActingUser}, + {username, Username} | maps:to_list(Term)]). + +notify_limit_clear(Username, ActingUser) -> + rabbit_event:notify(user_limits_cleared, + [{name, <<"limits">>}, {user_who_performed_action, ActingUser}, + {username, Username}]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e57dfb686c..b65912710f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -352,7 +352,7 @@ send_drained(Pid, CTagCredit) -> -spec list() -> [pid()]. list() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), + rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(), rabbit_channel, list_local, []). -spec list_local() -> [pid()]. diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl new file mode 100644 index 0000000000..2dbc4d0cf3 --- /dev/null +++ b/src/rabbit_channel_tracking.erl @@ -0,0 +1,289 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_channel_tracking). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_channel_tracking_handler +%% * rabbit_reader +%% * rabbit_event +-behaviour(rabbit_tracking). + +-export([boot/0, + update_tracked/1, + handle_cast/1, + register_tracked/1, + unregister_tracked/1, + count_tracked_items_in/1, + clear_tracking_tables/0, + shutdown_tracked_items/2]). + +-export([list/0, list_of_user/1, list_on_node/1, + tracked_channel_table_name_for/1, + tracked_channel_per_user_table_name_for/1, + get_all_tracked_channel_table_names_for_node/1, + delete_tracked_channel_user_entry/1]). + +-include_lib("rabbit.hrl"). + +-import(rabbit_misc, [pget/2]). + +%% +%% API +%% + +%% Sets up and resets channel tracking tables for this node. +-spec boot() -> ok. + +boot() -> + ensure_tracked_channels_table_for_this_node(), + rabbit_log:info("Setting up a table for channel tracking on this node: ~p", + [tracked_channel_table_name_for(node())]), + ensure_per_user_tracked_channels_table_for_node(), + rabbit_log:info("Setting up a table for channel tracking on this node: ~p", + [tracked_channel_per_user_table_name_for(node())]), + clear_tracking_tables(), + ok. + +-spec update_tracked(term()) -> ok. + +update_tracked(Event) -> + spawn(?MODULE, handle_cast, [Event]). + +%% Asynchronously handle update events +-spec handle_cast(term()) -> ok. + +handle_cast({channel_created, Details}) -> + ThisNode = node(), + case node(pget(pid, Details)) of + ThisNode -> + TrackedCh = #tracked_channel{id = TrackedChId} = + tracked_channel_from_channel_created_event(Details), + try + register_tracked(TrackedCh) + catch + error:{no_exists, _} -> + Msg = "Could not register channel ~p for tracking, " + "its table is not ready yet or the channel terminated prematurely", + rabbit_log_connection:warning(Msg, [TrackedChId]), + ok; + error:Err -> + Msg = "Could not register channel ~p for tracking: ~p", + rabbit_log_connection:warning(Msg, [TrackedChId, Err]), + ok + end; + _OtherNode -> + %% ignore + ok + end; +handle_cast({channel_closed, Details}) -> + %% channel has terminated, unregister iff local + case get_tracked_channel_by_pid(pget(pid, Details)) of + [#tracked_channel{name = Name}] -> + unregister_tracked(rabbit_tracking:id(node(), Name)); + _Other -> ok + end; +handle_cast({connection_closed, ConnDetails}) -> + ThisNode= node(), + ConnPid = pget(pid, ConnDetails), + + case pget(node, ConnDetails) of + ThisNode -> + TrackedChs = get_tracked_channels_by_connection_pid(ConnPid), + rabbit_log_connection:info( + "Closing all channels from connection '~p' " + "because it has been closed", [pget(name, ConnDetails)]), + shutdown_tracked_items(TrackedChs, undefined), + [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) || + #tracked_channel{name = Name} <- TrackedChs], + ok; + _DifferentNode -> + ok + end; +handle_cast({user_deleted, Details}) -> + Username = pget(name, Details), + %% Schedule user entry deletion, allowing time for connections to close + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, + delete_tracked_channel_user_entry, [Username]), + ok; +handle_cast({node_deleted, Details}) -> + Node = pget(node, Details), + rabbit_log_connection:info( + "Node '~s' was removed from the cluster, deleting" + " its channel tracking tables...", [Node]), + delete_tracked_channels_table_for_node(Node), + delete_per_user_tracked_channels_table_for_node(Node). + +-spec register_tracked(rabbit_types:tracked_channel()) -> ok. +-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]). + +register_tracked(TrackedCh = + #tracked_channel{node = Node, name = Name, username = Username}) -> + ChId = rabbit_tracking:id(Node, Name), + TableName = tracked_channel_table_name_for(Node), + PerUserChTableName = tracked_channel_per_user_table_name_for(Node), + %% upsert + case mnesia:dirty_read(TableName, ChId) of + [] -> + mnesia:dirty_write(TableName, TrackedCh), + mnesia:dirty_update_counter(PerUserChTableName, Username, 1); + [#tracked_channel{}] -> + ok + end, + ok. + +-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok. + +unregister_tracked(ChId = {Node, _Name}) when Node =:= node() -> + TableName = tracked_channel_table_name_for(Node), + PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node), + case mnesia:dirty_read(TableName, ChId) of + [] -> ok; + [#tracked_channel{username = Username}] -> + mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1), + mnesia:dirty_delete(TableName, ChId) + end. + +-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer(). + +count_tracked_items_in({user, Username}) -> + rabbit_tracking:count_tracked_items( + fun tracked_channel_per_user_table_name_for/1, + #tracked_channel_per_user.channel_count, Username, + "channels in vhost"). + +-spec clear_tracking_tables() -> ok. + +clear_tracking_tables() -> + clear_tracked_channel_tables_for_this_node(). + +-spec shutdown_tracked_items(list(), term()) -> ok. + +shutdown_tracked_items(TrackedItems, _Args) -> + close_channels(TrackedItems). + +%% helper functions +-spec list() -> [rabbit_types:tracked_channel()]. + +list() -> + lists:foldl( + fun (Node, Acc) -> + Tab = tracked_channel_table_name_for(Node), + Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'}) + end, [], rabbit_nodes:all_running()). + +-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()]. + +list_of_user(Username) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{username = Username, _ = '_'}). + +-spec list_on_node(node()) -> [rabbit_types:tracked_channel()]. + +list_on_node(Node) -> + try mnesia:dirty_match_object( + tracked_channel_table_name_for(Node), + #tracked_channel{_ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + +-spec tracked_channel_table_name_for(node()) -> atom(). + +tracked_channel_table_name_for(Node) -> + list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])). + +-spec tracked_channel_per_user_table_name_for(node()) -> atom(). + +tracked_channel_per_user_table_name_for(Node) -> + list_to_atom(rabbit_misc:format( + "tracked_channel_table_per_user_on_node_~s", [Node])). + +%% internal +ensure_tracked_channels_table_for_this_node() -> + ensure_tracked_channels_table_for_node(node()). + +ensure_per_user_tracked_channels_table_for_node() -> + ensure_per_user_tracked_channels_table_for_node(node()). + +%% Create tables +ensure_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_channel}, + {attributes, record_info(fields, tracked_channel)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]), + ok + end. + +ensure_per_user_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_per_user_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user}, + {attributes, record_info(fields, tracked_channel_per_user)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]), + ok + end. + +clear_tracked_channel_tables_for_this_node() -> + [rabbit_tracking:clear_tracking_table(T) + || T <- get_all_tracked_channel_table_names_for_node(node())]. + +delete_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_table_name_for(Node), + rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel"). + +delete_per_user_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_per_user_table_name_for(Node), + rabbit_tracking:delete_tracking_table(TableName, Node, + "per-user tracked channels"). + +get_all_tracked_channel_table_names_for_node(Node) -> + [tracked_channel_table_name_for(Node), + tracked_channel_per_user_table_name_for(Node)]. + +get_tracked_channels_by_connection_pid(ConnPid) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{connection = ConnPid, _ = '_'}). + +get_tracked_channel_by_pid(ChPid) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{pid = ChPid, _ = '_'}). + +delete_tracked_channel_user_entry(Username) -> + rabbit_tracking:delete_tracked_entry( + {rabbit_auth_backend_internal, exists, [Username]}, + fun tracked_channel_per_user_table_name_for/1, + Username). + +tracked_channel_from_channel_created_event(ChannelDetails) -> + Node = node(ChPid = pget(pid, ChannelDetails)), + Name = pget(name, ChannelDetails), + #tracked_channel{ + id = rabbit_tracking:id(Node, Name), + name = Name, + node = Node, + vhost = pget(vhost, ChannelDetails), + pid = ChPid, + connection = pget(connection, ChannelDetails), + username = pget(user, ChannelDetails)}. + +close_channels(TrackedChannels = [#tracked_channel{}|_]) -> + [rabbit_channel:shutdown(ChPid) + || #tracked_channel{pid = ChPid} <- TrackedChannels], + ok; +close_channels(_TrackedChannels = []) -> ok. diff --git a/src/rabbit_channel_tracking_handler.erl b/src/rabbit_channel_tracking_handler.erl new file mode 100644 index 0000000000..83e5343a7e --- /dev/null +++ b/src/rabbit_channel_tracking_handler.erl @@ -0,0 +1,71 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_channel_tracking_handler). + +%% This module keeps track of channel creation and termination events +%% on its local node. Similar to the rabbit_connection_tracking_handler, +%% the primary goal here is to decouple channel tracking from rabbit_reader +%% and isolate channel tracking to its own process to avoid blocking connection +%% creation events. Additionaly, creation events are also non-blocking in that +%% they spawn a short-live process for updating the tracking tables in realtime. +%% +%% Events from other nodes are ignored. + +-behaviour(gen_event). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("rabbit.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "channel tracking event handler"}, + {mfa, {gen_event, add_handler, + [rabbit_event, ?MODULE, []]}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, [channel_tracking]}, + {enables, recovery}]}). + +%% +%% API +%% + +init([]) -> + {ok, []}. + +handle_event(#event{type = channel_created, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({channel_created, Details}), + {ok, State}; +handle_event(#event{type = channel_closed, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({channel_closed, Details}), + {ok, State}; +handle_event(#event{type = connection_closed, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({connection_closed, Details}), + {ok, State}; +handle_event(#event{type = user_deleted, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({user_deleted, Details}), + {ok, State}; +%% A node had been deleted from the cluster. +handle_event(#event{type = node_deleted, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({node_deleted, Details}), + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index 9bfe36b925..33938a90cc 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -7,8 +7,6 @@ -module(rabbit_connection_tracking). --behaviour(gen_server). - %% Abstracts away how tracked connection records are stored %% and queried. %% @@ -17,20 +15,41 @@ %% * rabbit_connection_tracking_handler %% * rabbit_reader %% * rabbit_event +-behaviour(rabbit_tracking). -export([boot/0, - ensure_tracked_connections_table_for_node/1, + update_tracked/1, + handle_cast/1, + register_tracked/1, + unregister_tracked/1, + count_tracked_items_in/1, + clear_tracking_tables/0, + shutdown_tracked_items/2]). + +-export([ensure_tracked_connections_table_for_node/1, ensure_per_vhost_tracked_connections_table_for_node/1, + ensure_per_user_tracked_connections_table_for_node/1, + ensure_tracked_connections_table_for_this_node/0, ensure_per_vhost_tracked_connections_table_for_this_node/0, - tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1, - delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, + ensure_per_user_tracked_connections_table_for_this_node/0, + + tracked_connection_table_name_for/1, + tracked_connection_per_vhost_table_name_for/1, + tracked_connection_per_user_table_name_for/1, + get_all_tracked_connection_table_names_for_node/1, + + delete_tracked_connections_table_for_node/1, + delete_per_vhost_tracked_connections_table_for_node/1, + delete_per_user_tracked_connections_table_for_node/1, + delete_tracked_connection_user_entry/1, + delete_tracked_connection_vhost_entry/1, + clear_tracked_connection_tables_for_this_node/0, - register_connection/1, unregister_connection/1, + list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, - count_connections_in/1, lookup/1, count/0]). @@ -38,36 +57,47 @@ -import(rabbit_misc, [pget/2]). --export([start_link/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - -export([close_connections/3]). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - %% -%% GenServer API +%% API %% -init([]) -> - {ok, nostate}. +%% Behaviour callbacks + +-spec boot() -> ok. + +%% Sets up and resets connection tracking tables for this +%% node. +boot() -> + ensure_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for connection tracking on this node: ~p", + [tracked_connection_table_name_for(node())]), + ensure_per_vhost_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p", + [tracked_connection_per_vhost_table_name_for(node())]), + ensure_per_user_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for per-user connection counting on this node: ~p", + [tracked_connection_per_user_table_name_for(node())]), + clear_tracking_tables(), + ok. -handle_call(_Msg, _From, nostate) -> - {reply, ok, nostate}. +-spec update_tracked(term()) -> ok. +update_tracked(Event) -> + spawn(?MODULE, handle_cast, [Event]). -handle_cast({connection_created, Details}, nostate) -> +%% Asynchronously handle update events +-spec handle_cast(term()) -> ok. + +handle_cast({connection_created, Details}) -> ThisNode = node(), case pget(node, Details) of ThisNode -> TConn = tracked_connection_from_connection_created(Details), ConnId = TConn#tracked_connection.id, try - register_connection(TConn) + register_tracked(TConn) catch error:{no_exists, _} -> Msg = "Could not register connection ~p for tracking, " @@ -82,84 +112,114 @@ handle_cast({connection_created, Details}, nostate) -> _OtherNode -> %% ignore ok - end, - {noreply, nostate}; -handle_cast({connection_closed, Details}, nostate) -> + end; +handle_cast({connection_closed, Details}) -> ThisNode = node(), case pget(node, Details) of ThisNode -> %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, %% {pid,<0.1774.0>}, %% {node, rabbit@hostname}] - unregister_connection( - {pget(node, Details), - pget(name, Details)}); + unregister_tracked( + rabbit_tracking:id(ThisNode, pget(name, Details))); _OtherNode -> %% ignore ok - end, - {noreply, nostate}; -handle_cast({vhost_deleted, Details}, nostate) -> + end; +handle_cast({vhost_deleted, Details}) -> VHost = pget(name, Details), + %% Schedule vhost entry deletion, allowing time for connections to close + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, + delete_tracked_connection_vhost_entry, [VHost]), rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), - close_connections(rabbit_connection_tracking:list(VHost), - rabbit_misc:format("vhost '~s' is deleted", [VHost])), - {noreply, nostate}; + shutdown_tracked_items( + rabbit_connection_tracking:list(VHost), + rabbit_misc:format("vhost '~s' is deleted", [VHost])); %% Note: under normal circumstances this will be called immediately %% after the vhost_deleted above. Therefore we should be careful about %% what we log and be more defensive. -handle_cast({vhost_down, Details}, nostate) -> +handle_cast({vhost_down, Details}) -> VHost = pget(name, Details), Node = pget(node, Details), rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'" " because the vhost is stopping", [VHost, Node]), - close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), - rabbit_misc:format("vhost '~s' is down", [VHost])), - {noreply, nostate}; -handle_cast({user_deleted, Details}, nostate) -> + shutdown_tracked_items( + rabbit_connection_tracking:list_on_node(Node, VHost), + rabbit_misc:format("vhost '~s' is down", [VHost])); +handle_cast({user_deleted, Details}) -> Username = pget(name, Details), + %% Schedule user entry deletion, allowing time for connections to close + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, + delete_tracked_connection_user_entry, [Username]), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), - close_connections(rabbit_connection_tracking:list_of_user(Username), - rabbit_misc:format("user '~s' is deleted", [Username])), - {noreply, nostate}; + shutdown_tracked_items( + rabbit_connection_tracking:list_of_user(Username), + rabbit_misc:format("user '~s' is deleted", [Username])); %% A node had been deleted from the cluster. -handle_cast({node_deleted, Details}, nostate) -> +handle_cast({node_deleted, Details}) -> Node = pget(node, Details), rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]), - rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node), - rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node), - {noreply, nostate}; -handle_cast(_Msg, nostate) -> - {noreply, nostate}. + delete_tracked_connections_table_for_node(Node), + delete_per_vhost_tracked_connections_table_for_node(Node), + delete_per_user_tracked_connections_table_for_node(Node). -handle_info(_Info, nostate) -> - {noreply, nostate}. +-spec register_tracked(rabbit_types:tracked_connection()) -> ok. +-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]). -terminate(_Reason, nostate) -> +register_tracked(#tracked_connection{username = Username, vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() -> + TableName = tracked_connection_table_name_for(Node), + PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), + PerUserConnTableName = tracked_connection_per_user_table_name_for(Node), + %% upsert + case mnesia:dirty_read(TableName, ConnId) of + [] -> + mnesia:dirty_write(TableName, Conn), + mnesia:dirty_update_counter(PerVhostTableName, VHost, 1), + mnesia:dirty_update_counter(PerUserConnTableName, Username, 1); + [#tracked_connection{}] -> + ok + end, ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +-spec unregister_tracked(rabbit_types:tracked_connection_id()) -> ok. -%% -%% API -%% +unregister_tracked(ConnId = {Node, _Name}) when Node =:= node() -> + TableName = tracked_connection_table_name_for(Node), + PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), + PerUserConnTableName = tracked_connection_per_user_table_name_for(Node), + case mnesia:dirty_read(TableName, ConnId) of + [] -> ok; + [#tracked_connection{vhost = VHost, username = Username}] -> + mnesia:dirty_update_counter(PerUserConnTableName, Username, -1), + mnesia:dirty_update_counter(PerVhostTableName, VHost, -1), + mnesia:dirty_delete(TableName, ConnId) + end. --spec boot() -> ok. +-spec count_tracked_items_in({atom(), rabbit_types:vhost()}) -> non_neg_integer(). -%% Sets up and resets connection tracking tables for this -%% node. -boot() -> - ensure_tracked_connections_table_for_this_node(), - rabbit_log:info("Setting up a table for connection tracking on this node: ~p", - [tracked_connection_table_name_for(node())]), - ensure_per_vhost_tracked_connections_table_for_this_node(), - rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p", - [tracked_connection_per_vhost_table_name_for(node())]), - clear_tracked_connection_tables_for_this_node(), - ok. +count_tracked_items_in({vhost, VirtualHost}) -> + rabbit_tracking:count_tracked_items( + fun tracked_connection_per_vhost_table_name_for/1, + #tracked_connection_per_vhost.connection_count, VirtualHost, + "connections in vhost"); +count_tracked_items_in({user, Username}) -> + rabbit_tracking:count_tracked_items( + fun tracked_connection_per_user_table_name_for/1, + #tracked_connection_per_user.connection_count, Username, + "connections for user"). + +-spec clear_tracking_tables() -> ok. + +clear_tracking_tables() -> + clear_tracked_connection_tables_for_this_node(). + +-spec shutdown_tracked_items(list(), term()) -> ok. + +shutdown_tracked_items(TrackedItems, Message) -> + close_connections(TrackedItems, Message). +%% Extended API -spec ensure_tracked_connections_table_for_this_node() -> ok. @@ -173,6 +233,13 @@ ensure_per_vhost_tracked_connections_table_for_this_node() -> ensure_per_vhost_tracked_connections_table_for_node(node()). +-spec ensure_per_user_tracked_connections_table_for_this_node() -> ok. + +ensure_per_user_tracked_connections_table_for_this_node() -> + ensure_per_user_tracked_connections_table_for_node(node()). + + +%% Create tables -spec ensure_tracked_connections_table_for_node(node()) -> ok. ensure_tracked_connections_table_for_node(Node) -> @@ -186,7 +253,6 @@ ensure_tracked_connections_table_for_node(Node) -> ok end. - -spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok. ensure_per_vhost_tracked_connections_table_for_node(Node) -> @@ -200,45 +266,44 @@ ensure_per_vhost_tracked_connections_table_for_node(Node) -> ok end. +-spec ensure_per_user_tracked_connections_table_for_node(node()) -> ok. + +ensure_per_user_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_per_user_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_connection_per_user}, + {attributes, record_info(fields, tracked_connection_per_user)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a per-user tracked connection table for node ~p: ~p", [Node, Error]), + ok + end. -spec clear_tracked_connection_tables_for_this_node() -> ok. clear_tracked_connection_tables_for_this_node() -> - case mnesia:clear_table(tracked_connection_table_name_for(node())) of - {atomic, ok} -> ok; - {aborted, _} -> ok - end, - case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of - {atomic, ok} -> ok; - {aborted, _} -> ok - end. - + [rabbit_tracking:clear_tracking_table(T) + || T <- get_all_tracked_connection_table_names_for_node(node())]. -spec delete_tracked_connections_table_for_node(node()) -> ok. delete_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_table_name_for(Node), - case mnesia:delete_table(TableName) of - {atomic, ok} -> ok; - {aborted, {no_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to delete a tracked connection table for node ~p: ~p", [Node, Error]), - ok - end. - + rabbit_tracking:delete_tracking_table(TableName, Node, "tracked connection"). -spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok. delete_per_vhost_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_per_vhost_table_name_for(Node), - case mnesia:delete_table(TableName) of - {atomic, ok} -> ok; - {aborted, {no_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to delete a per-vhost tracked connection table for node ~p: ~p", [Node, Error]), - ok - end. + rabbit_tracking:delete_tracked_table(TableName, Node, + "per-vhost tracked connection"). +-spec delete_per_user_tracked_connections_table_for_node(node()) -> ok. + +delete_per_user_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_per_user_table_name_for(Node), + rabbit_tracking:delete_tracked_table(TableName, Node, + "per-user tracked connection"). -spec tracked_connection_table_name_for(node()) -> atom(). @@ -250,41 +315,23 @@ tracked_connection_table_name_for(Node) -> tracked_connection_per_vhost_table_name_for(Node) -> list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])). +-spec tracked_connection_per_user_table_name_for(node()) -> atom(). --spec register_connection(rabbit_types:tracked_connection()) -> ok. --dialyzer([{nowarn_function, [register_connection/1]}, race_conditions]). +tracked_connection_per_user_table_name_for(Node) -> + list_to_atom(rabbit_misc:format( + "tracked_connection_table_per_user_on_node_~s", [Node])). -register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() -> - TableName = tracked_connection_table_name_for(Node), - PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), - %% upsert - case mnesia:dirty_read(TableName, ConnId) of - [] -> - mnesia:dirty_write(TableName, Conn), - mnesia:dirty_update_counter( - PerVhostTableName, VHost, 1); - [_Row] -> - ok - end, - ok. +-spec get_all_tracked_connection_table_names_for_node(node()) -> [atom()]. --spec unregister_connection(rabbit_types:connection_name()) -> ok. - -unregister_connection(ConnId = {Node, _Name}) when Node =:= node() -> - TableName = tracked_connection_table_name_for(Node), - PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), - case mnesia:dirty_read(TableName, ConnId) of - [] -> ok; - [Row] -> - mnesia:dirty_update_counter( - PerVhostTableName, Row#tracked_connection.vhost, -1), - mnesia:dirty_delete(TableName, ConnId) - end. +get_all_tracked_connection_table_names_for_node(Node) -> + [tracked_connection_table_name_for(Node), + tracked_connection_per_vhost_table_name_for(Node), + tracked_connection_per_user_table_name_for(Node)]. -spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'. lookup(Name) -> - Nodes = rabbit_mnesia:cluster_nodes(running), + Nodes = rabbit_nodes:all_running(), lookup(Name, Nodes). lookup(_, []) -> @@ -303,7 +350,7 @@ list() -> fun (Node, Acc) -> Tab = tracked_connection_table_name_for(Node), Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'}) - end, [], rabbit_mnesia:cluster_nodes(running)). + end, [], rabbit_nodes:all_running()). -spec count() -> non_neg_integer(). @@ -312,17 +359,14 @@ count() -> fun (Node, Acc) -> Tab = tracked_connection_table_name_for(Node), Acc + mnesia:table_info(Tab, size) - end, 0, rabbit_mnesia:cluster_nodes(running)). + end, 0, rabbit_nodes:all_running()). -spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. list(VHost) -> - lists:foldl( - fun (Node, Acc) -> - Tab = tracked_connection_table_name_for(Node), - Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'}) - end, [], rabbit_mnesia:cluster_nodes(running)). - + rabbit_tracking:match_tracked_items( + fun tracked_connection_table_name_for/1, + #tracked_connection{vhost = VHost, _ = '_'}). -spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. @@ -346,32 +390,23 @@ list_on_node(Node, VHost) -> -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()]. list_of_user(Username) -> - lists:foldl( - fun (Node, Acc) -> - Tab = tracked_connection_table_name_for(Node), - Acc ++ mnesia:dirty_match_object( - Tab, - #tracked_connection{username = Username, _ = '_'}) - end, [], rabbit_mnesia:cluster_nodes(running)). - --spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). - -count_connections_in(VirtualHost) -> - lists:foldl(fun (Node, Acc) -> - Tab = tracked_connection_per_vhost_table_name_for(Node), - try - N = case mnesia:dirty_read(Tab, VirtualHost) of - [] -> 0; - [Val] -> Val#tracked_connection_per_vhost.connection_count - end, - Acc + N - catch _:Err -> - rabbit_log:error( - "Failed to fetch number of connections in vhost ~p on node ~p:~n~p~n", - [VirtualHost, Err, Node]), - Acc - end - end, 0, rabbit_mnesia:cluster_nodes(running)). + rabbit_tracking:match_tracked_items( + fun tracked_connection_table_name_for/1, + #tracked_connection{username = Username, _ = '_'}). + +%% Internal, delete tracked entries + +delete_tracked_connection_vhost_entry(Vhost) -> + rabbit_tracking:delete_tracked_entry( + {rabbit_vhost, exists, [Vhost]}, + fun tracked_connection_per_vhost_table_name_for/1, + Vhost). + +delete_tracked_connection_user_entry(Username) -> + rabbit_tracking:delete_tracked_entry( + {rabbit_auth_backend_internal, exists, [Username]}, + fun tracked_connection_per_user_table_name_for/1, + Username). %% Returns a #tracked_connection from connection_created %% event details. @@ -419,7 +454,7 @@ tracked_connection_from_connection_created(EventDetails) -> %% {connected_at,1453214290847}] Name = pget(name, EventDetails), Node = pget(node, EventDetails), - #tracked_connection{id = {Node, Name}, + #tracked_connection{id = rabbit_tracking:id(Node, Name), name = Name, node = Node, vhost = pget(vhost, EventDetails), @@ -476,4 +511,3 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> %% Do an RPC call to the node running the direct client. Node = node(Pid), rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]). - diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index c7917e0487..8b9e44445f 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -22,7 +22,6 @@ -export([close_connections/3]). -include_lib("rabbit.hrl"). --import(rabbit_misc, [pget/2]). -rabbit_boot_step({?MODULE, [{description, "connection tracking event handler"}, @@ -30,16 +29,9 @@ [rabbit_event, ?MODULE, []]}}, {cleanup, {gen_event, delete_handler, [rabbit_event, ?MODULE, []]}}, - {requires, [rabbit_connection_tracking]}, + {requires, [connection_tracking]}, {enables, recovery}]}). --rabbit_boot_step({rabbit_connection_tracking, - [{description, "statistics event manager"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_connection_tracking]}}, - {requires, [rabbit_event, rabbit_node_monitor]}, - {enables, ?MODULE}]}). - %% %% API %% @@ -48,26 +40,26 @@ init([]) -> {ok, []}. handle_event(#event{type = connection_created, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {connection_created, Details}), + _Pid = rabbit_connection_tracking:update_tracked({connection_created, Details}), {ok, State}; handle_event(#event{type = connection_closed, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {connection_closed, Details}), + _Pid = rabbit_connection_tracking:update_tracked({connection_closed, Details}), {ok, State}; handle_event(#event{type = vhost_deleted, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {vhost_deleted, Details}), + _Pid = rabbit_connection_tracking:update_tracked({vhost_deleted, Details}), {ok, State}; %% Note: under normal circumstances this will be called immediately %% after the vhost_deleted above. Therefore we should be careful about %% what we log and be more defensive. handle_event(#event{type = vhost_down, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {vhost_down, Details}), + _Pid = rabbit_connection_tracking:update_tracked({vhost_down, Details}), {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {user_deleted, Details}), + _Pid = rabbit_connection_tracking:update_tracked({user_deleted, Details}), {ok, State}; %% A node had been deleted from the cluster. handle_event(#event{type = node_deleted, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {node_deleted, Details}), + _Pid = rabbit_connection_tracking:update_tracked({node_deleted, Details}), {ok, State}; handle_event(_Event, State) -> {ok, State}. diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl index 63aa9b30f1..a251def11c 100644 --- a/src/rabbit_core_ff.erl +++ b/src/rabbit_core_ff.erl @@ -10,7 +10,8 @@ -export([quorum_queue_migration/3, implicit_default_bindings_migration/3, virtual_host_metadata_migration/3, - maintenance_mode_status_migration/3]). + maintenance_mode_status_migration/3, + user_limits_migration/3]). -rabbit_feature_flag( {quorum_queue, @@ -42,6 +43,13 @@ migration_fun => {?MODULE, maintenance_mode_status_migration} }}). +-rabbit_feature_flag( + {user_limits, + #{desc => "Configure connection and channel limits for a user", + stability => stable, + migration_fun => {?MODULE, user_limits_migration} + }}). + %% ------------------------------------------------------------------- %% Quorum queues. %% ------------------------------------------------------------------- @@ -141,3 +149,18 @@ maintenance_mode_status_migration(FeatureName, _FeatureProps, enable) -> end; maintenance_mode_status_migration(_FeatureName, _FeatureProps, is_enabled) -> rabbit_table:exists(rabbit_maintenance:status_table_name()). + +%% ------------------------------------------------------------------- +%% User limits. +%% ------------------------------------------------------------------- + +user_limits_migration(_FeatureName, _FeatureProps, enable) -> + Tab = rabbit_user, + rabbit_table:wait([Tab], _Retry = true), + Fun = fun(Row) -> internal_user:upgrade_to(internal_user_v2, Row) end, + case mnesia:transform_table(Tab, Fun, internal_user:fields(internal_user_v2)) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end; +user_limits_migration(_FeatureName, _FeatureProps, is_enabled) -> + mnesia:table_info(rabbit_user, attributes) =:= internal_user:fields(internal_user_v2). diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index f67e6dca48..9920015738 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -700,14 +700,15 @@ vhost_definition(VHost) -> }. list_users() -> - [begin - {ok, User} = rabbit_auth_backend_internal:lookup_user(pget(user, U)), - #{<<"name">> => User#internal_user.username, - <<"password_hash">> => base64:encode(User#internal_user.password_hash), - <<"hashing_algorithm">> => rabbit_auth_backend_internal:hashing_module_for_user(User), - <<"tags">> => tags_as_binaries(User#internal_user.tags) - } - end || U <- rabbit_auth_backend_internal:list_users()]. + [user_definition(U) || U <- rabbit_auth_backend_internal:all_users()]. + +user_definition(User) -> + #{<<"name">> => internal_user:get_username(User), + <<"password_hash">> => base64:encode(internal_user:get_password_hash(User)), + <<"hashing_algorithm">> => rabbit_auth_backend_internal:hashing_module_for_user(User), + <<"tags">> => tags_as_binaries(internal_user:get_tags(User)), + <<"limits">> => internal_user:get_limits(User) + }. list_runtime_parameters() -> [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list(), is_list(P)]. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index e6c251371a..f08a29f829 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -43,7 +43,7 @@ list_local() -> -spec list() -> [pid()]. list() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), + rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(), rabbit_direct, list_local, []). %%---------------------------------------------------------------------------- @@ -80,7 +80,7 @@ connect(Creds, VHost, Protocol, Pid, Infos) -> undefined -> {error, broker_is_booting}; _ -> - case is_over_connection_limit(VHost, Creds, Pid) of + case is_over_vhost_connection_limit(VHost, Creds, Pid) of true -> {error, not_allowed}; false -> @@ -100,7 +100,7 @@ connect(Creds, VHost, Protocol, Pid, Infos) -> {error, {auth_failure, "Refused"}} end %% AuthFun() end %% is_vhost_alive - end %% is_over_connection_limit + end %% is_over_vhost_connection_limit end; false -> {error, broker_not_found_on_node} end. @@ -146,7 +146,7 @@ is_vhost_alive(VHost, {Username, _Password}, Pid) -> false end. -is_over_connection_limit(VHost, {Username, _Password}, Pid) -> +is_over_vhost_connection_limit(VHost, {Username, _Password}, Pid) -> PrintedUsername = case Username of none -> ""; _ -> Username @@ -157,7 +157,7 @@ is_over_connection_limit(VHost, {Username, _Password}, Pid) -> rabbit_log_connection:error( "Error on Direct connection ~p~n" "access to vhost '~s' refused for user '~s': " - "connection limit (~p) is reached", + "vhost connection limit (~p) is reached", [Pid, VHost, PrintedUsername, Limit]), true catch @@ -174,19 +174,30 @@ notify_auth_result(Username, AuthResult, ExtraProps) -> ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). -connect1(User, VHost, Protocol, Pid, Infos) -> - % Note: peer_host can be either a tuple or - % a binary if reverse_dns_lookups is enabled - PeerHost = proplists:get_value(peer_host, Infos), - AuthzContext = proplists:get_value(variable_map, Infos, #{}), - try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}, AuthzContext) of - ok -> ok = pg_local:join(rabbit_direct, Pid), - rabbit_core_metrics:connection_created(Pid, Infos), - rabbit_event:notify(connection_created, Infos), - {ok, {User, rabbit_reader:server_properties(Protocol)}} - catch - exit:#amqp_error{name = Reason = not_allowed} -> - {error, Reason} +connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) -> + case rabbit_auth_backend_internal:is_over_connection_limit(Username) of + false -> + % Note: peer_host can be either a tuple or + % a binary if reverse_dns_lookups is enabled + PeerHost = proplists:get_value(peer_host, Infos), + AuthzContext = proplists:get_value(variable_map, Infos, #{}), + try rabbit_access_control:check_vhost_access(User, VHost, + {ip, PeerHost}, AuthzContext) of + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_core_metrics:connection_created(Pid, Infos), + rabbit_event:notify(connection_created, Infos), + {ok, {User, rabbit_reader:server_properties(Protocol)}} + catch + exit:#amqp_error{name = Reason = not_allowed} -> + {error, Reason} + end; + {true, Limit} -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "access refused for user '~s': " + "user connection limit (~p) is reached", + [Pid, Username, Limit]), + {error, not_allowed} end. -spec start_channel @@ -195,14 +206,25 @@ connect1(User, VHost, Protocol, Pid, Infos) -> rabbit_framing:amqp_table(), pid(), any()) -> {'ok', pid()}. -start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, - VHost, Capabilities, Collector, AmqpParams) -> - {ok, _, {ChannelPid, _}} = - supervisor2:start_child( - rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, Collector, AmqpParams}]), - {ok, ChannelPid}. +start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, + User = #user{username = Username}, VHost, Capabilities, + Collector, AmqpParams) -> + case rabbit_auth_backend_internal:is_over_channel_limit(Username) of + false -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child( + rabbit_direct_client_sup, + [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, + User, VHost, Capabilities, Collector, AmqpParams}]), + {ok, ChannelPid}; + {true, Limit} -> + rabbit_log_connection:error( + "Error on direct connection ~p~n" + "number of channels opened for user '~s' has reached the " + "maximum allowed limit of (~w)", + [ConnPid, Username, Limit]), + {error, not_allowed} + end. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 9676ec3716..02f590e2fb 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -152,7 +152,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> slaves_to_start_on_failure(Q, DeadGMPids) -> %% In case Mnesia has not caught up yet, filter out nodes we know %% to be dead.. - ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- + ClusterNodes = rabbit_nodes:all_running() -- [node(P) || P <- DeadGMPids], {_, OldNodes, _} = actual_queue_nodes(Q), {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes), @@ -234,16 +234,16 @@ add_mirror(QName, MirrorNode, SyncMode) -> #resource{virtual_host = VHost} = amqqueue:get_name(Q), case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of {ok, _} -> - try + try SPid = rabbit_amqqueue_sup_sup:start_queue_process( MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), rabbit_mirror_queue_slave:go(SPid, SyncMode) - of + of _ -> ok catch - error:QError -> + error:QError -> log_warning(QName, "Unable to start queue mirror on node '~p'. " "Target queue supervisor is not running: ~p~n", @@ -320,7 +320,7 @@ store_updated_slaves(Q0) when ?is_amqqueue(Q0) -> %% a long time without being removed. update_recoverable(SPids, RS) -> SNodes = [node(SPid) || SPid <- SPids], - RunningNodes = rabbit_mnesia:cluster_nodes(running), + RunningNodes = rabbit_nodes:all_running(), AddNodes = SNodes -- RS, DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave (RS -- DelNodes) ++ AddNodes. @@ -384,7 +384,7 @@ suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, rabbit_nodes:all_runni suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All). %% The third argument exists so we can pull a call to -%% rabbit_mnesia:cluster_nodes(running) out of a loop or transaction +%% rabbit_nodes:all_running() out of a loop or transaction %% or both. suggested_queue_nodes(Q, DefNode, All) when ?is_amqqueue(Q) -> Owner = amqqueue:get_exclusive_owner(Q), diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl index 341e639f45..e0d88c0f5e 100644 --- a/src/rabbit_mnesia_rename.erl +++ b/src/rabbit_mnesia_rename.erl @@ -233,7 +233,7 @@ update_term(_NodeMap, Term) -> rename_in_running_mnesia(FromNode, ToNode) -> All = rabbit_mnesia:cluster_nodes(all), - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), case {lists:member(FromNode, Running), lists:member(ToNode, All)} of {false, true} -> ok; {true, _} -> exit({old_node_running, FromNode}); diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 63760cd2e3..bfb1424b13 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -389,7 +389,7 @@ unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). -spec connections() -> [rabbit_types:connection()]. connections() -> - rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), + rabbit_misc:append_rpc_all_nodes(rabbit_nodes:all_running(), rabbit_networking, connections_local, []). -spec local_connections() -> [rabbit_types:connection()]. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 3408ea5a51..b56180c54c 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -162,7 +162,7 @@ notify_node_up() -> -spec notify_joined_cluster() -> 'ok'. notify_joined_cluster() -> - Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + Nodes = rabbit_nodes:all_running() -- [node()], gen_server:abcast(Nodes, ?SERVER, {joined_cluster, node(), rabbit_mnesia:node_type()}), ok. @@ -170,7 +170,7 @@ notify_joined_cluster() -> -spec notify_left_cluster(node()) -> 'ok'. notify_left_cluster(Node) -> - Nodes = rabbit_mnesia:cluster_nodes(running), + Nodes = rabbit_nodes:all_running(), gen_server:abcast(Nodes, ?SERVER, {left_cluster, Node}), ok. @@ -375,7 +375,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(notify_node_up, State = #state{guid = GUID}) -> - Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + Nodes = rabbit_nodes:all_running() -- [node()], gen_server:abcast(Nodes, ?SERVER, {node_up, node(), rabbit_mnesia:node_type(), GUID}), %% register other active rabbits with this rabbit @@ -425,7 +425,7 @@ handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> - case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso + case lists:member(Node, rabbit_nodes:all_running()) andalso maps:find(Node, GUIDs) =:= {ok, NodeGUID} of true -> spawn_link( %%[1] fun () -> @@ -572,7 +572,7 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, Node, node(), DownGUID, CheckGUID, MyGUID}) end, case maps:find(Node, GUIDs) of - {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running) + {ok, DownGUID} -> Alive = rabbit_nodes:all_running() -- [node(), Node], [case maps:find(N, GUIDs) of {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); @@ -771,7 +771,7 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, %% going away. It's only safe to forget anything about partitions when %% there are no partitions. Down = Partitions -- alive_rabbit_nodes(), - NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running), + NoLongerPartitioned = rabbit_nodes:all_running(), Partitions1 = case Partitions -- Down -- NoLongerPartitioned of [] -> []; _ -> Partitions @@ -853,7 +853,7 @@ disconnect(Node) -> %%-------------------------------------------------------------------- %% mnesia:system_info(db_nodes) (and hence -%% rabbit_mnesia:cluster_nodes(running)) does not return all nodes +%% rabbit_nodes:all_running()) does not return all nodes %% when partitioned, just those that we are sharing Mnesia state %% with. So we have a small set of replacement functions %% here. "rabbit" in a function's name implies we test if the rabbit @@ -917,7 +917,7 @@ ping_all() -> ok. possibly_partitioned_nodes() -> - alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running). + alive_rabbit_nodes() -- rabbit_nodes:all_running(). startup_log([]) -> rabbit_log:info("Starting rabbit_node_monitor~n", []); diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 6eff110ca3..915cf9d527 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -862,7 +862,7 @@ add_member(VHost, Name, Node, Timeout) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> QNodes = get_nodes(Q), - case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of + case lists:member(Node, rabbit_nodes:all_running()) of false -> {error, node_not_running}; true -> @@ -993,7 +993,7 @@ shrink_all(Node) -> [{rabbit_amqqueue:name(), {ok, pos_integer()} | {error, pos_integer(), term()}}]. grow(Node, VhostSpec, QueueSpec, Strategy) -> - Running = rabbit_mnesia:cluster_nodes(running), + Running = rabbit_nodes:all_running(), [begin Size = length(get_nodes(Q)), QName = amqqueue:get_name(Q), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 79895d95fa..f9697c96e5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -909,17 +909,27 @@ create_channel(Channel, #connection{name = Name, protocol = Protocol, frame_max = FrameMax, - user = User, vhost = VHost, - capabilities = Capabilities}} = State) -> - {ok, _ChSupPid, {ChPid, AState}} = - rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, - Protocol, User, VHost, Capabilities, Collector}), - MRef = erlang:monitor(process, ChPid), - put({ch_pid, ChPid}, {Channel, MRef}), - put({channel, Channel}, {ChPid, AState}), - {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}. + capabilities = Capabilities, + user = #user{username = Username} = User} + } = State) -> + case rabbit_auth_backend_internal:is_over_channel_limit(Username) of + false -> + {ok, _ChSupPid, {ChPid, AState}} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name, + Protocol, User, VHost, Capabilities, + Collector}), + MRef = erlang:monitor(process, ChPid), + put({ch_pid, ChPid}, {Channel, MRef}), + put({channel, Channel}, {ChPid, AState}), + {ok, {ChPid, AState}, State#v1{channel_count = ChannelCount + 1}}; + {true, Limit} -> + {error, rabbit_misc:amqp_error(not_allowed, + "number of channels opened for user '~s' has reached " + "the maximum allowed user limit of (~w)", + [Username, Limit], 'none')} + end. channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> case get({ch_pid, ChPid}) of @@ -1218,7 +1228,8 @@ handle_method0(#'connection.open'{virtual_host = VHost}, sock = Sock, throttle = Throttle}) -> - ok = is_over_connection_limit(VHost, User), + ok = is_over_vhost_connection_limit(VHost, User), + ok = is_over_user_connection_limit(User), ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}, #{}), ok = is_vhost_alive(VHost, User), NewConnection = Connection#connection{vhost = VHost}, @@ -1317,7 +1328,7 @@ is_vhost_alive(VHostPath, User) -> [VHostPath, User#user.username, VHostPath]) end. -is_over_connection_limit(VHostPath, User) -> +is_over_vhost_connection_limit(VHostPath, User) -> try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of false -> ok; {true, Limit} -> rabbit_misc:protocol_error(not_allowed, @@ -1329,6 +1340,14 @@ is_over_connection_limit(VHostPath, User) -> rabbit_misc:protocol_error(not_allowed, "vhost ~s not found", [VHostPath]) end. +is_over_user_connection_limit(#user{username = Username}) -> + case rabbit_auth_backend_internal:is_over_connection_limit(Username) of + false -> ok; + {true, Limit} -> rabbit_misc:protocol_error(not_allowed, + "Connection refused for user '~s': " + "user connection limit (~p) is reached", + [Username, Limit]) + end. validate_negotiated_integer_value(Field, Min, ClientValue) -> ServerValue = get_env(Field), diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 05f9a8d381..7df8844960 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -295,9 +295,9 @@ definitions(ram) -> definitions() -> [{rabbit_user, [{record_name, internal_user}, - {attributes, record_info(fields, internal_user)}, + {attributes, internal_user:fields()}, {disc_copies, [node()]}, - {match, #internal_user{_='_'}}]}, + {match, internal_user:pattern_match_all()}]}, {rabbit_user_permission, [{record_name, user_permission}, {attributes, record_info(fields, user_permission)}, diff --git a/src/rabbit_tracking.erl b/src/rabbit_tracking.erl new file mode 100644 index 0000000000..a124d20226 --- /dev/null +++ b/src/rabbit_tracking.erl @@ -0,0 +1,103 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_tracking). + +%% Common behaviour and processing functions for tracking components +%% +%% See in use: +%% * rabbit_connection_tracking +%% * rabbit_channel_tracking + +-callback boot() -> ok. +-callback update_tracked(term()) -> ok. +-callback handle_cast(term()) -> ok. +-callback register_tracked( + rabbit_types:tracked_connection() | + rabbit_types:tracked_channel()) -> 'ok'. +-callback unregister_tracked( + rabbit_types:tracked_connection_id() | + rabbit_types:tracked_channel_id()) -> 'ok'. +-callback count_tracked_items_in(term()) -> non_neg_integer(). +-callback clear_tracking_tables() -> 'ok'. +-callback shutdown_tracked_items(list(), term()) -> ok. + +-export([id/2, count_tracked_items/4, match_tracked_items/2, + clear_tracking_table/1, delete_tracking_table/3, + delete_tracked_entry/3]). + +%%---------------------------------------------------------------------------- + +-spec id(atom(), term()) -> + rabbit_types:tracked_connection_id() | rabbit_types:tracked_channel_id(). + +id(Node, Name) -> {Node, Name}. + +-spec count_tracked_items(function(), integer(), term(), string()) -> + non_neg_integer(). + +count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) -> + lists:foldl(fun (Node, Acc) -> + Tab = TableNameFun(Node), + try + N = case mnesia:dirty_read(Tab, Key) of + [] -> 0; + [Val] -> + element(CountRecPosition, Val) + end, + Acc + N + catch _:Err -> + rabbit_log:error( + "Failed to fetch number of ~p ~p on node ~p:~n~p~n", + [ContextMsg, Key, Node, Err]), + Acc + end + end, 0, rabbit_nodes:all_running()). + +-spec match_tracked_items(function(), tuple()) -> term(). + +match_tracked_items(TableNameFun, MatchSpec) -> + lists:foldl( + fun (Node, Acc) -> + Tab = TableNameFun(Node), + Acc ++ mnesia:dirty_match_object( + Tab, + MatchSpec) + end, [], rabbit_nodes:all_running()). + +-spec clear_tracking_table(atom()) -> ok. + +clear_tracking_table(TableName) -> + case mnesia:clear_table(TableName) of + {atomic, ok} -> ok; + {aborted, _} -> ok + end. + +-spec delete_tracking_table(atom(), node(), string()) -> ok. + +delete_tracking_table(TableName, Node, ContextMsg) -> + case mnesia:delete_table(TableName) of + {atomic, ok} -> ok; + {aborted, {no_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to delete a ~p table for node ~p: ~p", + [ContextMsg, Node, Error]), + ok + end. + +-spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok. + +delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) -> + ClusterNodes = rabbit_nodes:all_running(), + ExistsInCluster = + lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes), + case ExistsInCluster of + false -> + [mnesia:dirty_delete(TableNameFun(Node), Key) || Node <- ClusterNodes]; + true -> + ok + end. diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index ba72ecaebc..b1b128fecc 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -145,7 +145,7 @@ run_mnesia_upgrades(Upgrades, AllNodes) -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:cluster_nodes(running) -- [node()], + AfterUs = rabbit_nodes:all_running() -- [node()], case {node_type_legacy(), AfterUs} of {disc, []} -> primary; diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl index ce5532ba54..bee01f3054 100644 --- a/src/rabbit_vhost_limit.erl +++ b/src/rabbit_vhost_limit.erl @@ -90,7 +90,8 @@ is_over_connection_limit(VirtualHost) -> %% with limit = 0, no connections are allowed {ok, 0} -> {true, 0}; {ok, Limit} when is_integer(Limit) andalso Limit > 0 -> - ConnectionCount = rabbit_connection_tracking:count_connections_in(VirtualHost), + ConnectionCount = + rabbit_connection_tracking:count_tracked_items_in({vhost, VirtualHost}), case ConnectionCount >= Limit of false -> false; true -> {true, Limit} diff --git a/test/per_user_connection_channel_limit_SUITE.erl b/test/per_user_connection_channel_limit_SUITE.erl new file mode 100644 index 0000000000..35745d65f8 --- /dev/null +++ b/test/per_user_connection_channel_limit_SUITE.erl @@ -0,0 +1,1625 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(per_user_connection_channel_limit_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_1_network}, + {group, cluster_size_2_network}, + {group, cluster_size_2_direct} + ]. + +groups() -> + ClusterSize1Tests = [ + most_basic_single_node_connection_and_channel_count, + single_node_single_user_connection_and_channel_count, + single_node_multiple_users_connection_and_channel_count, + single_node_list_in_user, + single_node_single_user_limit, + single_node_single_user_zero_limit, + single_node_single_user_clear_limits, + single_node_multiple_users_clear_limits, + single_node_multiple_users_limit, + single_node_multiple_users_zero_limit + + ], + ClusterSize2Tests = [ + most_basic_cluster_connection_and_channel_count, + cluster_single_user_connection_and_channel_count, + cluster_multiple_users_connection_and_channel_count, + cluster_node_restart_connection_and_channel_count, + cluster_node_list_on_node, + cluster_single_user_limit, + cluster_single_user_limit2, + cluster_single_user_zero_limit, + cluster_single_user_clear_limits, + cluster_multiple_users_clear_limits, + cluster_multiple_users_zero_limit + ], + [ + {cluster_size_1_network, [], ClusterSize1Tests}, + {cluster_size_2_network, [], ClusterSize2Tests}, + {cluster_size_2_direct, [], ClusterSize2Tests} + ]. + +suite() -> + [ + %% If a test hangs, no need to wait for 30 minutes. + {timetrap, {minutes, 8}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_1_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_1_network, Config1, 1); +init_per_group(cluster_size_2_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_2_network, Config1, 2); +init_per_group(cluster_size_2_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_2_direct, Config1, 2); + +init_per_group(cluster_rename, Config) -> + init_per_multinode_group(cluster_rename, Config, 2). + +init_per_multinode_group(Group, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + case Group of + cluster_rename -> + % The broker is managed by {init,end}_per_testcase(). + Config1; + _ -> + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()) + end. + +end_per_group(cluster_rename, Config) -> + % The broker is managed by {init,end}_per_testcase(). + Config; +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + clear_all_connection_tracking_tables(Config), + clear_all_channel_tracking_tables(Config), + Config. + +end_per_testcase(Testcase, Config) -> + clear_all_connection_tracking_tables(Config), + clear_all_channel_tracking_tables(Config), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +clear_all_connection_tracking_tables(Config) -> + [rabbit_ct_broker_helpers:rpc(Config, + N, + rabbit_connection_tracking, + clear_tracking_tables, + []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. + +clear_all_channel_tracking_tables(Config) -> + [rabbit_ct_broker_helpers:rpc(Config, + N, + rabbit_channel_tracking, + clear_tracking_tables, + []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- + +most_basic_single_node_connection_and_channel_count(Config) -> + Username = proplists:get_value(rmq_username, Config), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn] = open_connections(Config, [0]), + [Chan] = open_channels(Conn, 1), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 1 + end), + close_channels([Chan]), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + close_connections([Conn]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 + end). + +single_node_single_user_connection_and_channel_count(Config) -> + Username = proplists:get_value(rmq_username, Config), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn1] = open_connections(Config, [0]), + [Chan1] = open_channels(Conn1, 1), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 1 + end), + close_channels([Chan1]), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + close_connections([Conn1]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 + end), + + [Conn2] = open_connections(Config, [0]), + Chans2 = [_|_] = open_channels(Conn2, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 5 + end), + + [Conn3] = open_connections(Config, [0]), + Chans3 = [_|_] = open_channels(Conn3, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 2 andalso + count_channels_of_user(Config, Username) =:= 10 + end), + + [Conn4] = open_connections(Config, [0]), + _Chans4 = [_|_] = open_channels(Conn4, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 3 andalso + count_channels_of_user(Config, Username) =:= 15 + end), + + close_connections([Conn4]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 2 andalso + count_channels_of_user(Config, Username) =:= 10 + end), + + [Conn5] = open_connections(Config, [0]), + Chans5 = [_|_] = open_channels(Conn5, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 3 andalso + count_channels_of_user(Config, Username) =:= 15 + end), + + close_channels(Chans2 ++ Chans3 ++ Chans5), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn5]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 + end). + +single_node_multiple_users_connection_and_channel_count(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username1) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + [Conn1] = open_connections(Config, [{0, Username1}]), + Chans1 = [_|_] = open_channels(Conn1, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 andalso + count_channels_of_user(Config, Username1) =:= 5 + end), + close_channels(Chans1), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 + end), + ?assertEqual(0, count_channels_of_user(Config, Username1)), + close_connections([Conn1]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username1) =:= 0 + end), + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 1 andalso + count_channels_of_user(Config, Username2) =:= 5 + end), + + [Conn3] = open_connections(Config, [{0, Username1}]), + Chans3 = [_|_] = open_channels(Conn3, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 andalso + count_channels_of_user(Config, Username1) =:= 5 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 1 andalso + count_channels_of_user(Config, Username2) =:= 5 + end), + + [Conn4] = open_connections(Config, [{0, Username1}]), + _Chans4 = [_|_] = open_channels(Conn4, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 2 andalso + count_channels_of_user(Config, Username1) =:= 10 + end), + + close_connections([Conn4]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 andalso + count_channels_of_user(Config, Username1) =:= 5 + end), + + [Conn5] = open_connections(Config, [{0, Username2}]), + Chans5 = [_|_] = open_channels(Conn5, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 2 andalso + count_channels_of_user(Config, Username2) =:= 10 + end), + + [Conn6] = open_connections(Config, [{0, Username2}]), + Chans6 = [_|_] = open_channels(Conn6, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 3 andalso + count_channels_of_user(Config, Username2) =:= 15 + end), + + close_channels(Chans2 ++ Chans3 ++ Chans5 ++ Chans6), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn5, Conn6]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + + rabbit_ct_broker_helpers:delete_user(Config, Username1), + rabbit_ct_broker_helpers:delete_user(Config, Username2). + +single_node_list_in_user(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + ?assertEqual(0, length(connections_in(Config, Username1))), + ?assertEqual(0, length(connections_in(Config, Username2))), + ?assertEqual(0, length(channels_in(Config, Username1))), + ?assertEqual(0, length(channels_in(Config, Username2))), + + [Conn1] = open_connections(Config, [{0, Username1}]), + [Chan1] = open_channels(Conn1, 1), + [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + [#tracked_channel{username = Username1}] = channels_in(Config, Username1), + close_channels([Chan1]), + ?assertEqual(0, length(channels_in(Config, Username1))), + close_connections([Conn1]), + ?assertEqual(0, length(connections_in(Config, Username1))), + + [Conn2] = open_connections(Config, [{0, Username2}]), + [Chan2] = open_channels(Conn2, 1), + [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + [#tracked_channel{username = Username2}] = channels_in(Config, Username2), + + [Conn3] = open_connections(Config, [{0, Username1}]), + [Chan3] = open_channels(Conn3, 1), + [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + [#tracked_channel{username = Username1}] = channels_in(Config, Username1), + + [Conn4] = open_connections(Config, [{0, Username1}]), + [_Chan4] = open_channels(Conn4, 1), + close_connections([Conn4]), + [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + [#tracked_channel{username = Username1}] = channels_in(Config, Username1), + + [Conn5, Conn6] = open_connections(Config, [{0, Username2}, {0, Username2}]), + [Chan5] = open_channels(Conn5, 1), + [Chan6] = open_channels(Conn6, 1), + [<<"guest1">>, <<"guest2">>] = + lists:usort(lists:map(fun (#tracked_connection{username = V}) -> V end, + all_connections(Config))), + [<<"guest1">>, <<"guest2">>] = + lists:usort(lists:map(fun (#tracked_channel{username = V}) -> V end, + all_channels(Config))), + + close_channels([Chan2, Chan3, Chan5, Chan6]), + ?assertEqual(0, length(all_channels(Config))), + + close_connections([Conn2, Conn3, Conn5, Conn6]), + ?assertEqual(0, length(all_connections(Config))), + + rabbit_ct_broker_helpers:delete_user(Config, Username1), + rabbit_ct_broker_helpers:delete_user(Config, Username2). + +most_basic_cluster_connection_and_channel_count(Config) -> + Username = proplists:get_value(rmq_username, Config), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 5 + end), + ?assertEqual(1, count_connections_of_user(Config, Username)), + ?assertEqual(5, count_channels_of_user(Config, Username)), + + [Conn2] = open_connections(Config, [1]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(2, count_connections_of_user(Config, Username)), + ?assertEqual(10, count_channels_of_user(Config, Username)), + + [Conn3] = open_connections(Config, [1]), + Chans3 = [_|_] = open_channels(Conn3, 5), + ?assertEqual(3, count_connections_of_user(Config, Username)), + ?assertEqual(15, count_channels_of_user(Config, Username)), + + close_channels(Chans1 ++ Chans2 ++ Chans3), + ?assertEqual(0, count_channels_of_user(Config, Username)), + + close_connections([Conn1, Conn2, Conn3]), + ?assertEqual(0, count_connections_of_user(Config, Username)). + +cluster_single_user_connection_and_channel_count(Config) -> + Username = proplists:get_value(rmq_username, Config), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn1] = open_connections(Config, [0]), + _Chans1 = [_|_] = open_channels(Conn1, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 5 + end), + + close_connections([Conn1]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn2] = open_connections(Config, [1]), + Chans2 = [_|_] = open_channels(Conn2, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 5 + end), + + [Conn3] = open_connections(Config, [0]), + Chans3 = [_|_] = open_channels(Conn3, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 2 andalso + count_channels_of_user(Config, Username) =:= 10 + end), + + [Conn4] = open_connections(Config, [1]), + Chans4 = [_|_] = open_channels(Conn4, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 3 andalso + count_channels_of_user(Config, Username) =:= 15 + end), + + close_channels(Chans2 ++ Chans3 ++ Chans4), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn4]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 + end). + +cluster_multiple_users_connection_and_channel_count(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + [Conn1] = open_connections(Config, [{0, Username1}]), + _Chans1 = [_|_] = open_channels(Conn1, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 andalso + count_channels_of_user(Config, Username1) =:= 5 + end), + close_connections([Conn1]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username1) =:= 0 + end), + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 1 andalso + count_channels_of_user(Config, Username2) =:= 5 + end), + + [Conn3] = open_connections(Config, [{1, Username1}]), + Chans3 = [_|_] = open_channels(Conn3, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 andalso + count_channels_of_user(Config, Username1) =:= 5 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 1 andalso + count_channels_of_user(Config, Username2) =:= 5 + end), + + [Conn4] = open_connections(Config, [{0, Username1}]), + _Chans4 = [_|_] = open_channels(Conn4, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 2 andalso + count_channels_of_user(Config, Username1) =:= 10 + end), + + close_connections([Conn4]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 andalso + count_channels_of_user(Config, Username1) =:= 5 + end), + + [Conn5] = open_connections(Config, [{1, Username2}]), + Chans5 = [_|_] = open_channels(Conn5, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 2 andalso + count_channels_of_user(Config, Username2) =:= 10 + end), + + [Conn6] = open_connections(Config, [{0, Username2}]), + Chans6 = [_|_] = open_channels(Conn6, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 3 andalso + count_channels_of_user(Config, Username2) =:= 15 + end), + + close_channels(Chans2 ++ Chans3 ++ Chans5 ++ Chans6), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn5, Conn6]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + + rabbit_ct_broker_helpers:delete_user(Config, Username1), + rabbit_ct_broker_helpers:delete_user(Config, Username2). + +cluster_node_restart_connection_and_channel_count(Config) -> + Username = proplists:get_value(rmq_username, Config), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn1] = open_connections(Config, [0]), + _Chans1 = [_|_] = open_channels(Conn1, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 5 + end), + close_connections([Conn1]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn2] = open_connections(Config, [1]), + Chans2 = [_|_] = open_channels(Conn2, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 5 + end), + + [Conn3] = open_connections(Config, [0]), + Chans3 = [_|_] = open_channels(Conn3, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 2 andalso + count_channels_of_user(Config, Username) =:= 10 + end), + + [Conn4] = open_connections(Config, [1]), + _Chans4 = [_|_] = open_channels(Conn4, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 3 andalso + count_channels_of_user(Config, Username) =:= 15 + end), + + [Conn5] = open_connections(Config, [1]), + Chans5 = [_|_] = open_channels(Conn5, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 4 andalso + count_channels_of_user(Config, Username) =:= 20 + end), + + rabbit_ct_broker_helpers:restart_broker(Config, 1), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 andalso + count_channels_of_user(Config, Username) =:= 5 + end), + + close_channels(Chans2 ++ Chans3 ++ Chans5), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn4, Conn5]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 + end). + +cluster_node_list_on_node(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + rabbit_ct_helpers:await_condition( + fun () -> + length(all_connections(Config)) =:= 0 andalso + length(all_channels(Config)) =:= 0 andalso + length(connections_on_node(Config, 0)) =:= 0 andalso + length(channels_on_node(Config, 0)) =:= 0 + end), + + [Conn1] = open_connections(Config, [0]), + _Chans1 = [_|_] = open_channels(Conn1, 5), + [#tracked_connection{node = A}] = connections_on_node(Config, 0), + rabbit_ct_helpers:await_condition( + fun () -> + length([Ch || Ch <- channels_on_node(Config, 0), Ch#tracked_channel.node =:= A]) =:= 5 + end), + close_connections([Conn1]), + rabbit_ct_helpers:await_condition( + fun () -> + length(connections_on_node(Config, 0)) =:= 0 andalso + length(channels_on_node(Config, 0)) =:= 0 + end), + + [Conn2] = open_connections(Config, [1]), + _Chans2 = [_|_] = open_channels(Conn2, 5), + [#tracked_connection{node = B}] = connections_on_node(Config, 1), + rabbit_ct_helpers:await_condition( + fun () -> + length([Ch || Ch <- channels_on_node(Config, 1), Ch#tracked_channel.node =:= B]) =:= 5 + end), + + [Conn3] = open_connections(Config, [0]), + Chans3 = [_|_] = open_channels(Conn3, 5), + rabbit_ct_helpers:await_condition( + fun () -> + length(connections_on_node(Config, 0)) =:= 1 andalso + length(channels_on_node(Config, 0)) =:= 5 + end), + + [Conn4] = open_connections(Config, [1]), + _Chans4 = [_|_] = open_channels(Conn4, 5), + rabbit_ct_helpers:await_condition( + fun () -> + length(connections_on_node(Config, 1)) =:= 2 andalso + length(channels_on_node(Config, 1)) =:= 10 + end), + + close_connections([Conn4]), + rabbit_ct_helpers:await_condition( + fun () -> + length(connections_on_node(Config, 1)) =:= 1 andalso + length(channels_on_node(Config, 1)) =:= 5 + end), + + [Conn5] = open_connections(Config, [0]), + Chans5 = [_|_] = open_channels(Conn5, 5), + rabbit_ct_helpers:await_condition( + fun () -> + length(connections_on_node(Config, 0)) =:= 2 andalso + length(channels_on_node(Config, 0)) =:= 10 + end), + + rabbit_ct_broker_helpers:stop_broker(Config, 1), + await_running_node_refresh(Config, 0), + + rabbit_ct_helpers:await_condition( + fun () -> + length(all_connections(Config)) =:= 2 andalso + length(all_channels(Config)) =:= 10 + end), + + close_channels(Chans3 ++ Chans5), + rabbit_ct_helpers:await_condition( + fun () -> + length(all_channels(Config)) =:= 0 + end), + + close_connections([Conn3, Conn5]), + rabbit_ct_helpers:await_condition( + fun () -> + length(all_connections(Config)) =:= 0 + end), + + rabbit_ct_broker_helpers:start_broker(Config, 1). + +single_node_single_user_limit(Config) -> + single_node_single_user_limit_with(Config, 5, 25), + single_node_single_user_limit_with(Config, -1, -1). + +single_node_single_user_limit_with(Config, ConnLimit, ChLimit) -> + Username = proplists:get_value(rmq_username, Config), + set_user_connection_and_channel_limit(Config, Username, 3, 15), + + ?assertEqual(0, count_connections_of_user(Config, Username)), + ?assertEqual(0, count_channels_of_user(Config, Username)), + + [Conn1, Conn2, Conn3] = Conns1 = open_connections(Config, [0, 0, 0]), + [_Chans1, Chans2, Chans3] = [open_channels(Conn, 5) || Conn <- Conns1], + + %% we've crossed the limit + expect_that_client_connection_is_rejected(Config, 0), + expect_that_client_connection_is_rejected(Config, 0), + expect_that_client_connection_is_rejected(Config, 0), + expect_that_client_channel_is_rejected(Conn1), + + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(Conn1) =:= false andalso + is_process_alive(Conn2) andalso + is_process_alive(Conn3) + end), + + set_user_connection_and_channel_limit(Config, Username, ConnLimit, ChLimit), + [Conn4, Conn5] = Conns2 = open_connections(Config, [0, 0]), + [Chans4, Chans5] = [open_channels(Conn, 5) || Conn <- Conns2], + + close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn1, Conn2, Conn3, Conn4, Conn5]), + ?assertEqual(0, count_connections_of_user(Config, Username)), + + set_user_connection_and_channel_limit(Config, Username, -1, -1). + +single_node_single_user_zero_limit(Config) -> + Username = proplists:get_value(rmq_username, Config), + set_user_connection_and_channel_limit(Config, Username, 0, 0), + + ?assertEqual(0, count_connections_of_user(Config, Username)), + ?assertEqual(0, count_channels_of_user(Config, Username)), + + %% with limit = 0 no connections are allowed + expect_that_client_connection_is_rejected(Config), + expect_that_client_connection_is_rejected(Config), + expect_that_client_connection_is_rejected(Config), + + %% with limit = 0 no channels are allowed + set_user_connection_and_channel_limit(Config, Username, 1, 0), + [ConnA] = open_connections(Config, [0]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 + end), + expect_that_client_channel_is_rejected(ConnA), + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(ConnA) =:= false andalso + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username, -1, -1), + [Conn1, Conn2] = Conns1 = open_connections(Config, [0, 0]), + [Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1], + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 2 andalso + count_channels_of_user(Config, Username) =:= 10 + end), + + close_channels(Chans1 ++ Chans2), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn1, Conn2]), + ?assertEqual(0, count_connections_of_user(Config, Username)). + +single_node_single_user_clear_limits(Config) -> + Username = proplists:get_value(rmq_username, Config), + set_user_connection_and_channel_limit(Config, Username, 3, 15), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + [Conn1, Conn2, Conn3] = Conns1 = open_connections(Config, [0, 0, 0]), + [_Chans1, Chans2, Chans3] = [open_channels(Conn, 5) || Conn <- Conns1], + + %% we've crossed the limit + expect_that_client_connection_is_rejected(Config, 0), + expect_that_client_connection_is_rejected(Config, 0), + expect_that_client_connection_is_rejected(Config, 0), + expect_that_client_channel_is_rejected(Conn1), + + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(Conn1) =:= false andalso + is_process_alive(Conn2) andalso + is_process_alive(Conn3) + end), + + %% reach limit again + [Conn4] = open_connections(Config, [{0, Username}]), + Chans4 = [_|_] = open_channels(Conn4, 5), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 3 andalso + count_channels_of_user(Config, Username) =:= 15 + end), + + clear_all_user_limits(Config, Username), + + [Conn5, Conn6, Conn7] = Conns2 = open_connections(Config, [0, 0, 0]), + [Chans5, Chans6, Chans7] = [open_channels(Conn, 5) || Conn <- Conns2], + + close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6 ++ Chans7), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn4, Conn5, Conn6, Conn7]), + ?assertEqual(0, count_connections_of_user(Config, Username)), + + set_user_connection_and_channel_limit(Config, Username, -1, -1). + +single_node_multiple_users_clear_limits(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + set_user_connection_and_channel_limit(Config, Username1, 0, 0), + set_user_connection_and_channel_limit(Config, Username2, 0, 0), + + ?assertEqual(0, count_connections_of_user(Config, Username1)), + ?assertEqual(0, count_connections_of_user(Config, Username2)), + ?assertEqual(0, count_channels_of_user(Config, Username1)), + ?assertEqual(0, count_channels_of_user(Config, Username2)), + + %% with limit = 0 no connections are allowed + expect_that_client_connection_is_rejected(Config, 0, Username1), + expect_that_client_connection_is_rejected(Config, 0, Username2), + expect_that_client_connection_is_rejected(Config, 0, Username1), + + %% with limit = 0 no channels are allowed + set_user_connection_and_channel_limit(Config, Username1, 1, 0), + set_user_connection_and_channel_limit(Config, Username2, 1, 0), + [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {0, Username2}]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 + end), + expect_that_client_channel_is_rejected(ConnA), + expect_that_client_channel_is_rejected(ConnB), + + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(ConnA) =:= false andalso + is_process_alive(ConnB) =:= false + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + clear_all_user_limits(Config, Username1), + set_user_channel_limit_only(Config, Username2, -1), + set_user_connection_limit_only(Config, Username2, -1), + + [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username1}, {0, Username1}]), + [Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1], + + close_channels(Chans1 ++ Chans2), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + close_connections([Conn1, Conn2]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username1, -1, -1), + set_user_connection_and_channel_limit(Config, Username2, -1, -1). + +single_node_multiple_users_limit(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + set_user_connection_and_channel_limit(Config, Username1, 2, 10), + set_user_connection_and_channel_limit(Config, Username2, 2, 10), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [ + {0, Username1}, + {0, Username1}, + {0, Username2}, + {0, Username2}]), + + [_Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1], + + %% we've crossed the limit + expect_that_client_connection_is_rejected(Config, 0, Username1), + expect_that_client_connection_is_rejected(Config, 0, Username2), + expect_that_client_channel_is_rejected(Conn1), + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(Conn1) =:= false andalso + is_process_alive(Conn3) =:= true + end), + + [Conn5] = open_connections(Config, [0]), + Chans5 = [_|_] = open_channels(Conn5, 5), + + set_user_connection_and_channel_limit(Config, Username1, 5, 25), + set_user_connection_and_channel_limit(Config, Username2, -10, -50), + + [Conn6, Conn7, Conn8, Conn9, Conn10] = Conns2 = open_connections(Config, [ + {0, Username1}, + {0, Username1}, + {0, Username1}, + {0, Username2}, + {0, Username2}]), + + [Chans6, Chans7, Chans8, Chans9, Chans10] = [open_channels(Conn, 5) || Conn <- Conns2], + + close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6 ++ + Chans7 ++ Chans8 ++ Chans9 ++ Chans10), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn4, Conn5, Conn6, + Conn7, Conn8, Conn9, Conn10]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username1, -1, -1), + set_user_connection_and_channel_limit(Config, Username2, -1, -1), + + rabbit_ct_broker_helpers:delete_user(Config, Username1), + rabbit_ct_broker_helpers:delete_user(Config, Username2). + + +single_node_multiple_users_zero_limit(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + set_user_connection_and_channel_limit(Config, Username1, 0, 0), + set_user_connection_and_channel_limit(Config, Username2, 0, 0), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + %% with limit = 0 no connections are allowed + expect_that_client_connection_is_rejected(Config, 0, Username1), + expect_that_client_connection_is_rejected(Config, 0, Username2), + expect_that_client_connection_is_rejected(Config, 0, Username1), + + %% with limit = 0 no channels are allowed + set_user_connection_and_channel_limit(Config, Username1, 1, 0), + set_user_connection_and_channel_limit(Config, Username2, 1, 0), + [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {0, Username2}]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 + end), + expect_that_client_channel_is_rejected(ConnA), + expect_that_client_channel_is_rejected(ConnB), + + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(ConnA) =:= false andalso + is_process_alive(ConnB) =:= false + end), + + ?assertEqual(false, is_process_alive(ConnA)), + ?assertEqual(false, is_process_alive(ConnB)), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username1, -1, -1), + [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username1}, {0, Username1}]), + [Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1], + + close_channels(Chans1 ++ Chans2), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + close_connections([Conn1, Conn2]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username1, -1, -1), + set_user_connection_and_channel_limit(Config, Username2, -1, -1). + + +cluster_single_user_limit(Config) -> + Username = proplists:get_value(rmq_username, Config), + set_user_connection_limit_only(Config, Username, 2), + set_user_channel_limit_only(Config, Username, 10), + + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + %% here connections and channels are opened to different nodes + [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username}, {1, Username}]), + [_Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1], + + %% we've crossed the limit + expect_that_client_connection_is_rejected(Config, 0, Username), + expect_that_client_connection_is_rejected(Config, 1, Username), + expect_that_client_channel_is_rejected(Conn1), + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(Conn1) =:= false andalso + is_process_alive(Conn2) =:= true + end), + + set_user_connection_and_channel_limit(Config, Username, 5, 25), + + [Conn3, Conn4] = Conns2 = open_connections(Config, [{0, Username}, {0, Username}]), + [Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns2], + + close_channels(Chans2 ++ Chans3 ++ Chans4), + ?assertEqual(0, count_channels_of_user(Config, Username)), + + close_connections([Conn2, Conn3, Conn4]), + ?assertEqual(0, count_connections_of_user(Config, Username)), + + set_user_connection_and_channel_limit(Config, Username, -1, -1). + +cluster_single_user_limit2(Config) -> + Username = proplists:get_value(rmq_username, Config), + set_user_connection_and_channel_limit(Config, Username, 2, 10), + + ?assertEqual(0, count_connections_of_user(Config, Username)), + ?assertEqual(0, count_channels_of_user(Config, Username)), + + %% here a limit is reached on one node first + [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username}, {0, Username}]), + [_Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1], + + %% we've crossed the limit + expect_that_client_connection_is_rejected(Config, 0, Username), + expect_that_client_connection_is_rejected(Config, 1, Username), + expect_that_client_channel_is_rejected(Conn1), + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(Conn1) =:= false andalso + is_process_alive(Conn2) =:= true + end), + + set_user_connection_and_channel_limit(Config, Username, 5, 25), + + [Conn3, Conn4, Conn5, Conn6, {error, not_allowed}] = open_connections(Config, [ + {1, Username}, + {1, Username}, + {1, Username}, + {1, Username}, + {1, Username}]), + + [Chans3, Chans4, Chans5, Chans6, [{error, not_allowed}]] = + [open_channels(Conn, 1) || Conn <- [Conn3, Conn4, Conn5, Conn6, Conn1]], + + close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn4, Conn5, Conn6]), + ?assertEqual(0, count_connections_of_user(Config, Username)), + + set_user_connection_and_channel_limit(Config, Username, -1, -1). + + +cluster_single_user_zero_limit(Config) -> + Username = proplists:get_value(rmq_username, Config), + set_user_connection_and_channel_limit(Config, Username, 0, 0), + + ?assertEqual(0, count_connections_of_user(Config, Username)), + ?assertEqual(0, count_channels_of_user(Config, Username)), + + %% with limit = 0 no connections are allowed + expect_that_client_connection_is_rejected(Config, 0), + expect_that_client_connection_is_rejected(Config, 1), + expect_that_client_connection_is_rejected(Config, 0), + + %% with limit = 0 no channels are allowed + set_user_connection_and_channel_limit(Config, Username, 1, 0), + [ConnA] = open_connections(Config, [0]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 1 + end), + expect_that_client_channel_is_rejected(ConnA), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + ?assertEqual(false, is_process_alive(ConnA)), + + set_user_connection_and_channel_limit(Config, Username, -1, -1), + [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [0, 1, 0, 1]), + [Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1], + + close_channels(Chans1 ++ Chans2 ++ Chans3 ++ Chans4), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn1, Conn2, Conn3, Conn4]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username, -1, -1). + +cluster_single_user_clear_limits(Config) -> + Username = proplists:get_value(rmq_username, Config), + set_user_connection_and_channel_limit(Config, Username, 2, 10), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 andalso + count_channels_of_user(Config, Username) =:= 0 + end), + + %% here a limit is reached on one node first + [Conn1, Conn2] = Conns1 = open_connections(Config, [{0, Username}, {0, Username}]), + [_Chans1, Chans2] = [open_channels(Conn, 5) || Conn <- Conns1], + + %% we've crossed the limit + expect_that_client_connection_is_rejected(Config, 0, Username), + expect_that_client_connection_is_rejected(Config, 1, Username), + expect_that_client_channel_is_rejected(Conn1), + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(Conn1) =:= false andalso + is_process_alive(Conn2) =:= true + end), + clear_all_user_limits(Config, Username), + + [Conn3, Conn4, Conn5, Conn6, Conn7] = open_connections(Config, [ + {1, Username}, + {1, Username}, + {1, Username}, + {1, Username}, + {1, Username}]), + + [Chans3, Chans4, Chans5, Chans6, Chans7] = + [open_channels(Conn, 1) || Conn <- [Conn3, Conn4, Conn5, Conn6, Conn7]], + + close_channels(Chans2 ++ Chans3 ++ Chans4 ++ Chans5 ++ Chans6 ++ Chans7), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username) =:= 0 + end), + + close_connections([Conn2, Conn3, Conn4, Conn5, Conn6, Conn7]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username, -1, -1). + +cluster_multiple_users_clear_limits(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + set_user_connection_and_channel_limit(Config, Username1, 0, 0), + set_user_connection_and_channel_limit(Config, Username2, 0, 0), + + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + %% with limit = 0 no connections are allowed + expect_that_client_connection_is_rejected(Config, 0, Username1), + expect_that_client_connection_is_rejected(Config, 0, Username2), + expect_that_client_connection_is_rejected(Config, 1, Username1), + expect_that_client_connection_is_rejected(Config, 1, Username2), + + %% with limit = 0 no channels are allowed + set_user_connection_and_channel_limit(Config, Username1, 1, 0), + set_user_connection_and_channel_limit(Config, Username2, 1, 0), + [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {1, Username2}]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 1 andalso + count_connections_of_user(Config, Username2) =:= 1 + end), + expect_that_client_channel_is_rejected(ConnA), + + rabbit_ct_helpers:await_condition( + fun () -> + is_process_alive(ConnA) =:= false andalso + is_process_alive(ConnB) =:= true + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 1 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + close_connections([ConnB]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + ?assertEqual(false, is_process_alive(ConnB)), + + clear_all_user_limits(Config, Username1), + clear_all_user_limits(Config, Username2), + + [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [ + {0, Username1}, + {0, Username2}, + {1, Username1}, + {1, Username2}]), + + [Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1], + + close_channels(Chans1 ++ Chans2 ++ Chans3 ++ Chans4), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + close_connections([Conn1, Conn2, Conn3, Conn4]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + + set_user_connection_and_channel_limit(Config, Username1, -1, -1), + set_user_connection_and_channel_limit(Config, Username2, -1, -1). + +cluster_multiple_users_zero_limit(Config) -> + Username1 = <<"guest1">>, + Username2 = <<"guest2">>, + + set_up_user(Config, Username1), + set_up_user(Config, Username2), + + set_user_connection_and_channel_limit(Config, Username1, 0, 0), + set_user_connection_and_channel_limit(Config, Username2, 0, 0), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 0 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + + %% with limit = 0 no connections are allowed + expect_that_client_connection_is_rejected(Config, 0, Username1), + expect_that_client_connection_is_rejected(Config, 0, Username2), + expect_that_client_connection_is_rejected(Config, 1, Username1), + expect_that_client_connection_is_rejected(Config, 1, Username2), + + %% with limit = 0 no channels are allowed + set_user_connection_and_channel_limit(Config, Username1, 1, 0), + set_user_connection_and_channel_limit(Config, Username2, 1, 0), + [ConnA, ConnB] = open_connections(Config, [{0, Username1}, {1, Username2}]), + + expect_that_client_channel_is_rejected(ConnA), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username1) =:= 0 andalso + count_connections_of_user(Config, Username2) =:= 1 + end), + rabbit_ct_helpers:await_condition( + fun () -> + count_channels_of_user(Config, Username1) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + ?assertEqual(false, is_process_alive(ConnA)), + ?assertEqual(true, is_process_alive(ConnB)), + close_connections([ConnB]), + rabbit_ct_helpers:await_condition( + fun () -> + count_connections_of_user(Config, Username2) =:= 0 andalso + count_channels_of_user(Config, Username2) =:= 0 + end), + ?assertEqual(false, is_process_alive(ConnB)), + + set_user_connection_and_channel_limit(Config, Username1, -1, -1), + set_user_connection_and_channel_limit(Config, Username2, -1, -1), + + [Conn1, Conn2, Conn3, Conn4] = Conns1 = open_connections(Config, [ + {0, Username1}, + {0, Username2}, + {1, Username1}, + {1, Username2}]), + + [Chans1, Chans2, Chans3, Chans4] = [open_channels(Conn, 5) || Conn <- Conns1], + + close_channels(Chans1 ++ Chans2 ++ Chans3 ++ Chans4), + ?assertEqual(0, count_channels_of_user(Config, Username1)), + ?assertEqual(0, count_channels_of_user(Config, Username2)), + + close_connections([Conn1, Conn2, Conn3, Conn4]), + ?assertEqual(0, count_connections_of_user(Config, Username1)), + ?assertEqual(0, count_connections_of_user(Config, Username2)), + + set_user_connection_and_channel_limit(Config, Username1, -1, -1), + set_user_connection_and_channel_limit(Config, Username2, -1, -1). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +open_connections(Config, NodesAndUsers) -> + % Randomly select connection type + OpenConnectionFun = case ?config(connection_type, Config) of + network -> open_unmanaged_connection; + direct -> open_unmanaged_connection_direct + end, + Conns = lists:map(fun + ({Node, User}) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, + User, User); + (Node) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) + end, NodesAndUsers), + timer:sleep(100), + Conns. + +close_connections(Conns) -> + lists:foreach(fun + (Conn) -> + rabbit_ct_client_helpers:close_connection(Conn) + end, Conns). + +open_channels(Conn, N) -> + [open_channel(Conn) || _ <- lists:seq(1, N)]. + +open_channel(Conn) when is_pid(Conn) -> + try amqp_connection:open_channel(Conn) of + {ok, Ch} -> Ch + catch + _:_Error -> {error, not_allowed} + end. + +close_channels(Channels = [_|_]) -> + [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels]. + +count_connections_of_user(Config, Username) -> + count_connections_in(Config, Username, 0). +count_connections_in(Config, Username, NodeIndex) -> + count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username). + + count_channels_of_user(Config, Username) -> + count_channels_in(Config, Username, 0). + count_channels_in(Config, Username, NodeIndex) -> + count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username). + +count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + count_tracked_items_in, [{user, Username}]). + +connections_in(Config, Username) -> + connections_in(Config, 0, Username). +connections_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username). + +channels_in(Config, Username) -> + channels_in(Config, 0, Username). +channels_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username). + +tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list_of_user, [Username]). + +connections_on_node(Config) -> + connections_on_node(Config, 0). +connections_on_node(Config, NodeIndex) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename), + tracked_items_on_node(Config, NodeIndex, rabbit_connection_tracking, Node). + +channels_on_node(Config) -> + channels_on_node(Config, 0). +channels_on_node(Config, NodeIndex) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename), + tracked_items_on_node(Config, NodeIndex, rabbit_channel_tracking, Node). + +tracked_items_on_node(Config, NodeIndex, TrackingMod, NodeForListing) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list_on_node, [NodeForListing]). + +all_connections(Config) -> + all_connections(Config, 0). +all_connections(Config, NodeIndex) -> + all_tracked_items(Config, NodeIndex, rabbit_connection_tracking). + +all_channels(Config) -> + all_channels(Config, 0). +all_channels(Config, NodeIndex) -> + all_tracked_items(Config, NodeIndex, rabbit_channel_tracking). + +all_tracked_items(Config, NodeIndex, TrackingMod) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list, []). + +set_up_user(Config, Username) -> + VHost = proplists:get_value(rmq_vhost, Config), + rabbit_ct_broker_helpers:add_user(Config, Username), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username, VHost), + set_user_connection_and_channel_limit(Config, Username, -1, -1). + +set_user_connection_and_channel_limit(Config, Username, ConnLimit, ChLimit) -> + set_user_connection_and_channel_limit(Config, 0, Username, ConnLimit, ChLimit). + +set_user_connection_and_channel_limit(Config, NodeIndex, Username, ConnLimit, ChLimit) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + set_user_limits, Node, [rabbit_data_coercion:to_list(Username)] ++ + ["{\"max-connections\": " ++ integer_to_list(ConnLimit) ++ "," ++ + " \"max-channels\": " ++ integer_to_list(ChLimit) ++ "}"]). + +set_user_connection_limit_only(Config, Username, ConnLimit) -> + set_user_connection_limit_only(Config, 0, Username, ConnLimit). + +set_user_connection_limit_only(Config, NodeIndex, Username, ConnLimit) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + set_user_limits, Node, [rabbit_data_coercion:to_list(Username)] ++ + ["{\"max-connections\": " ++ integer_to_list(ConnLimit) ++ "}"]). + +set_user_channel_limit_only(Config, Username, ChLimit) -> + set_user_channel_limit_only(Config, 0, Username, ChLimit). + +set_user_channel_limit_only(Config, NodeIndex, Username, ChLimit) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + set_user_limits, Node, [rabbit_data_coercion:to_list(Username)] ++ + ["{\"max-channels\": " ++ integer_to_list(ChLimit) ++ "}"]). + +clear_all_user_limits(Config, Username) -> + clear_all_user_limits(Config, 0, Username). +clear_all_user_limits(Config, NodeIndex, Username) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + clear_user_limits, Node, [rabbit_data_coercion:to_list(Username), "all"]). + +await_running_node_refresh(_Config, _NodeIndex) -> + timer:sleep(250). + +expect_that_client_connection_is_rejected(Config) -> + expect_that_client_connection_is_rejected(Config, 0). + +expect_that_client_connection_is_rejected(Config, NodeIndex) -> + {error, not_allowed} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex). + +expect_that_client_connection_is_rejected(Config, NodeIndex, User) -> + {error, not_allowed} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, User, User). + +expect_that_client_channel_is_rejected(Conn) -> + {error, not_allowed} = open_channel(Conn). diff --git a/test/per_user_connection_channel_limit_partitions_SUITE.erl b/test/per_user_connection_channel_limit_partitions_SUITE.erl new file mode 100644 index 0000000000..32af9ce9a1 --- /dev/null +++ b/test/per_user_connection_channel_limit_partitions_SUITE.erl @@ -0,0 +1,174 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(per_user_connection_channel_limit_partitions_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-import(rabbit_ct_client_helpers, [open_unmanaged_connection/2, + open_unmanaged_connection/3]). + +all() -> + [ + {group, net_ticktime_1} + ]. + +groups() -> + [ + {net_ticktime_1, [], [ + cluster_full_partition_with_autoheal + ]} + ]. + +suite() -> + [ + %% If a test hangs, no need to wait for 30 minutes. + {timetrap, {minutes, 8}} + ]. + +%% see partitions_SUITE +-define(DELAY, 12000). + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:configure_dist_proxy/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(net_ticktime_1 = Group, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]), + init_per_multinode_group(Group, Config1, 3). + +init_per_multinode_group(_Group, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- + +cluster_full_partition_with_autoheal(Config) -> + Username = proplists:get_value(rmq_username, Config), + rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal), + + ?assertEqual(0, count_connections_in(Config, Username)), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% 6 connections, 2 per node + Conn1 = open_unmanaged_connection(Config, A), + Conn2 = open_unmanaged_connection(Config, A), + Conn3 = open_unmanaged_connection(Config, B), + Conn4 = open_unmanaged_connection(Config, B), + Conn5 = open_unmanaged_connection(Config, C), + Conn6 = open_unmanaged_connection(Config, C), + + _Chans1 = [_|_] = open_channels(Conn1, 5), + _Chans3 = [_|_] = open_channels(Conn3, 5), + _Chans5 = [_|_] = open_channels(Conn5, 5), + wait_for_count_connections_in(Config, Username, 6, 60000), + ?assertEqual(15, count_channels_in(Config, Username)), + + %% B drops off the network, non-reachable by either A or C + rabbit_ct_broker_helpers:block_traffic_between(A, B), + rabbit_ct_broker_helpers:block_traffic_between(B, C), + timer:sleep(?DELAY), + + %% A and C are still connected, so 4 connections are tracked + %% All connections to B are dropped + wait_for_count_connections_in(Config, Username, 4, 60000), + ?assertEqual(10, count_channels_in(Config, Username)), + + rabbit_ct_broker_helpers:allow_traffic_between(A, B), + rabbit_ct_broker_helpers:allow_traffic_between(B, C), + timer:sleep(?DELAY), + + %% during autoheal B's connections were dropped + wait_for_count_connections_in(Config, Username, 4, 60000), + ?assertEqual(10, count_channels_in(Config, Username)), + + lists:foreach(fun (Conn) -> + (catch rabbit_ct_client_helpers:close_connection(Conn)) + end, [Conn1, Conn2, Conn3, Conn4, + Conn5, Conn6]), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + + passed. + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +wait_for_count_connections_in(Config, Username, Expected, Time) when Time =< 0 -> + ?assertMatch(Connections when length(Connections) == Expected, + connections_in(Config, Username)); +wait_for_count_connections_in(Config, Username, Expected, Time) -> + case connections_in(Config, Username) of + Connections when length(Connections) == Expected -> + ok; + _ -> + Sleep = 3000, + timer:sleep(Sleep), + wait_for_count_connections_in(Config, Username, Expected, Time - Sleep) + end. + +open_channels(Conn, N) -> + [begin + {ok, Ch} = amqp_connection:open_channel(Conn), + Ch + end || _ <- lists:seq(1, N)]. + +count_connections_in(Config, Username) -> + length(connections_in(Config, Username)). + +connections_in(Config, Username) -> + connections_in(Config, 0, Username). +connections_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username). + +count_channels_in(Config, Username) -> + Channels = channels_in(Config, Username), + length([Ch || Ch = #tracked_channel{username = Username0} <- Channels, + Username =:= Username0]). + +channels_in(Config, Username) -> + channels_in(Config, 0, Username). +channels_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username). + +tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list_of_user, [Username]). diff --git a/test/per_user_connection_channel_tracking_SUITE.erl b/test/per_user_connection_channel_tracking_SUITE.erl new file mode 100644 index 0000000000..674a2e4177 --- /dev/null +++ b/test/per_user_connection_channel_tracking_SUITE.erl @@ -0,0 +1,840 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(per_user_connection_channel_tracking_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +all() -> + [ + {group, cluster_size_1_network}, + {group, cluster_size_2_network}, + {group, cluster_size_1_direct}, + {group, cluster_size_2_direct} + ]. + +groups() -> + ClusterSize1Tests = [ + single_node_user_connection_channel_tracking, + single_node_user_deletion, + single_node_vhost_down_mimic, + single_node_vhost_deletion + ], + ClusterSize2Tests = [ + cluster_user_deletion, + cluster_vhost_down_mimic, + cluster_vhost_deletion, + cluster_node_removed + ], + [ + {cluster_size_1_network, [], ClusterSize1Tests}, + {cluster_size_2_network, [], ClusterSize2Tests}, + {cluster_size_1_direct, [], ClusterSize1Tests}, + {cluster_size_2_direct, [], ClusterSize2Tests} + ]. + +suite() -> + [ + %% If a test hangs, no need to wait for 30 minutes. + {timetrap, {minutes, 8}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_1_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_1_network, Config1, 1); +init_per_group(cluster_size_2_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_2_network, Config1, 2); +init_per_group(cluster_size_1_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_1_direct, Config1, 1); +init_per_group(cluster_size_2_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_2_direct, Config1, 2). + +init_per_multinode_group(_Group, Config, NodeCount) -> + Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, NodeCount}, + {rmq_nodename_suffix, Suffix} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + clear_all_connection_tracking_tables(Config), + clear_all_channel_tracking_tables(Config), + Config. + +end_per_testcase(Testcase, Config) -> + clear_all_connection_tracking_tables(Config), + clear_all_channel_tracking_tables(Config), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +clear_all_connection_tracking_tables(Config) -> + [rabbit_ct_broker_helpers:rpc(Config, + N, + rabbit_connection_tracking, + clear_tracking_tables, + []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. + +clear_all_channel_tracking_tables(Config) -> + [rabbit_ct_broker_helpers:rpc(Config, + N, + rabbit_channel_tracking, + clear_tracking_tables, + []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. + +%% ------------------------------------------------------------------- +%% Test cases. +%% ------------------------------------------------------------------- +single_node_user_connection_channel_tracking(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + [Chan1] = open_channels(Conn1, 1), + [#tracked_connection{username = Username}] = connections_in(Config, Username), + [#tracked_channel{username = Username}] = channels_in(Config, Username), + ?assertEqual(true, is_process_alive(Conn1)), + ?assertEqual(true, is_process_alive(Chan1)), + close_channels([Chan1]), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Chan1)), + close_connections([Conn1]), + ?assertEqual(0, length(connections_in(Config, Username))), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + timer:sleep(100), + [#tracked_connection{username = Username2}] = connections_in(Config, Username2), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + [Conn3] = open_connections(Config, [0]), + Chans3 = [_|_] = open_channels(Conn3, 5), + [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn3)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans3], + + [Conn4] = open_connections(Config, [0]), + Chans4 = [_|_] = open_channels(Conn4, 5), + ?assertEqual(2, tracked_user_connection_count(Config, Username)), + ?assertEqual(10, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn4)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans4], + kill_connections([Conn4]), + [#tracked_connection{username = Username}] = connections_in(Config, Username), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn4)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans4], + + [Conn5] = open_connections(Config, [0]), + Chans5 = [_|_] = open_channels(Conn5, 7), + [Username, Username] = + lists:map(fun (#tracked_connection{username = U}) -> U end, + connections_in(Config, Username)), + ?assertEqual(12, count_channels_in(Config, Username)), + ?assertEqual(12, tracked_user_channel_count(Config, Username)), + ?assertEqual(2, tracked_user_connection_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn5)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans5], + + close_channels(Chans2 ++ Chans3 ++ Chans5), + ?assertEqual(0, length(all_channels(Config))), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + close_connections([Conn2, Conn3, Conn5]), + rabbit_ct_broker_helpers:delete_user(Config, Username2), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, length(all_connections(Config))). + +single_node_user_deletion(Config) -> + set_tracking_execution_timeout(Config, 100), + + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, Username2)), + ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, Username2)), + + rabbit_ct_broker_helpers:delete_user(Config, Username2), + timer:sleep(100), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + %% ensure vhost entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, Username2)), + ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, Username2)), + + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + close_channels(Chans1), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)). + +single_node_vhost_deletion(Config) -> + set_tracking_execution_timeout(Config, 100), + + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, Vhost)), + + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1], + + %% ensure vhost entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, Vhost)), + + rabbit_ct_broker_helpers:add_vhost(Config, Vhost). + +single_node_vhost_down_mimic(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{0, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + %% mimic vhost down event, while connections exist + mimic_vhost_down(Config, 0, Vhost), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1]. + +cluster_user_deletion(Config) -> + set_tracking_execution_timeout(Config, 0, 100), + set_tracking_execution_timeout(Config, 1, 100), + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config, 0)), + ?assertEqual(100, get_tracking_execution_timeout(Config, 1)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [0]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_user_table(Config, 1, Username2)), + ?assertEqual(true, exists_in_tracked_channel_per_user_table(Config, 1, Username2)), + + rabbit_ct_broker_helpers:delete_user(Config, Username2), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + %% ensure user entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_user_table(Config, 1, Username2)), + ?assertEqual(false, exists_in_tracked_channel_per_user_table(Config, 1, Username2)), + + close_channels(Chans1), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)). + +cluster_vhost_deletion(Config) -> + set_tracking_execution_timeout(Config, 0, 100), + set_tracking_execution_timeout(Config, 1, 100), + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(100, get_tracking_execution_timeout(Config, 0)), + ?assertEqual(100, get_tracking_execution_timeout(Config, 1)), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [{0, Username}]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)), + ?assertEqual(true, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)), + + rabbit_ct_broker_helpers:delete_vhost(Config, Vhost), + timer:sleep(200), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1], + + %% ensure vhost entry is cleared after 'tracking_execution_timeout' + ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 0, Vhost)), + ?assertEqual(false, exists_in_tracked_connection_per_vhost_table(Config, 1, Vhost)), + + rabbit_ct_broker_helpers:add_vhost(Config, Vhost), + rabbit_ct_broker_helpers:add_user(Config, Username), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username, Vhost). + +cluster_vhost_down_mimic(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [{0, Username}]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + mimic_vhost_down(Config, 1, Vhost), + timer:sleep(100), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + %% gen_event notifies local handlers. remote connections still active + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + mimic_vhost_down(Config, 0, Vhost), + timer:sleep(100), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(false, is_process_alive(Conn1)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans1]. + +cluster_node_removed(Config) -> + Username = proplists:get_value(rmq_username, Config), + Username2 = <<"guest2">>, + + Vhost = proplists:get_value(rmq_vhost, Config), + + rabbit_ct_broker_helpers:add_user(Config, Username2), + rabbit_ct_broker_helpers:set_full_permissions(Config, Username2, Vhost), + + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, count_connections_in(Config, Username2)), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, count_channels_in(Config, Username2)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username2)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username2)), + + [Conn1] = open_connections(Config, [{0, Username}]), + Chans1 = [_|_] = open_channels(Conn1, 5), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + [Conn2] = open_connections(Config, [{1, Username2}]), + Chans2 = [_|_] = open_channels(Conn2, 5), + ?assertEqual(1, count_connections_in(Config, Username2)), + ?assertEqual(5, count_channels_in(Config, Username2)), + ?assertEqual(1, tracked_user_connection_count(Config, Username2)), + ?assertEqual(5, tracked_user_channel_count(Config, Username2)), + ?assertEqual(true, is_process_alive(Conn2)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans2], + + rabbit_ct_broker_helpers:stop_broker(Config, 1), + timer:sleep(200), + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1), + timer:sleep(200), + NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + + DroppedConnTrackingTables = + rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName), + [?assertEqual( + {'EXIT', {aborted, {no_exists, Tab, all}}}, + catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables], + + DroppedChTrackingTables = + rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName), + [?assertEqual( + {'EXIT', {aborted, {no_exists, Tab, all}}}, + catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables], + + ?assertEqual(false, is_process_alive(Conn2)), + [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], + + ?assertEqual(1, count_connections_in(Config, Username)), + ?assertEqual(5, count_channels_in(Config, Username)), + ?assertEqual(1, tracked_user_connection_count(Config, Username)), + ?assertEqual(5, tracked_user_channel_count(Config, Username)), + ?assertEqual(true, is_process_alive(Conn1)), + [?assertEqual(true, is_process_alive(Ch)) || Ch <- Chans1], + + close_channels(Chans1), + ?assertEqual(0, count_channels_in(Config, Username)), + ?assertEqual(0, tracked_user_channel_count(Config, Username)), + + close_connections([Conn1]), + ?assertEqual(0, count_connections_in(Config, Username)), + ?assertEqual(0, tracked_user_connection_count(Config, Username)). + +%% ------------------------------------------------------------------- +%% Helpers +%% ------------------------------------------------------------------- + +open_connections(Config, NodesAndUsers) -> + % Randomly select connection type + OpenConnectionFun = case ?config(connection_type, Config) of + network -> open_unmanaged_connection; + direct -> open_unmanaged_connection_direct + end, + Conns = lists:map(fun + ({Node, User}) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, + User, User); + (Node) -> + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) + end, NodesAndUsers), + timer:sleep(500), + Conns. + +close_connections(Conns) -> + lists:foreach(fun + (Conn) -> + rabbit_ct_client_helpers:close_connection(Conn) + end, Conns), + timer:sleep(500). + +kill_connections(Conns) -> + lists:foreach(fun + (Conn) -> + (catch exit(Conn, please_terminate)) + end, Conns), + timer:sleep(500). + +open_channels(Conn, N) -> + [begin + {ok, Ch} = amqp_connection:open_channel(Conn), + Ch + end || _ <- lists:seq(1, N)]. + +close_channels(Channels = [_|_]) -> + [rabbit_ct_client_helpers:close_channel(Ch) || Ch <- Channels]. + +count_connections_in(Config, Username) -> + length(connections_in(Config, Username)). + +connections_in(Config, Username) -> + connections_in(Config, 0, Username). +connections_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_connection_tracking, Username). + +count_channels_in(Config, Username) -> + Channels = channels_in(Config, Username), + length([Ch || Ch = #tracked_channel{username = Username0} <- Channels, + Username =:= Username0]). + +channels_in(Config, Username) -> + channels_in(Config, 0, Username). +channels_in(Config, NodeIndex, Username) -> + tracked_list_of_user(Config, NodeIndex, rabbit_channel_tracking, Username). + +tracked_list_of_user(Config, NodeIndex, TrackingMod, Username) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list_of_user, [Username]). + +tracked_user_connection_count(Config, Username) -> + tracked_user_connection_count(Config, 0, Username). +tracked_user_connection_count(Config, NodeIndex, Username) -> + count_user_tracked_items(Config, NodeIndex, rabbit_connection_tracking, Username). + +tracked_user_channel_count(Config, Username) -> + tracked_user_channel_count(Config, 0, Username). +tracked_user_channel_count(Config, NodeIndex, Username) -> + count_user_tracked_items(Config, NodeIndex, rabbit_channel_tracking, Username). + +count_user_tracked_items(Config, NodeIndex, TrackingMod, Username) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + count_tracked_items_in, [{user, Username}]). + +exists_in_tracked_connection_per_vhost_table(Config, VHost) -> + exists_in_tracked_connection_per_vhost_table(Config, 0, VHost). +exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) -> + exists_in_tracking_table(Config, NodeIndex, + fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1, + VHost). + +exists_in_tracked_connection_per_user_table(Config, Username) -> + exists_in_tracked_connection_per_user_table(Config, 0, Username). +exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) -> + exists_in_tracking_table(Config, NodeIndex, + fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1, + Username). + +exists_in_tracked_channel_per_user_table(Config, Username) -> + exists_in_tracked_channel_per_user_table(Config, 0, Username). +exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) -> + exists_in_tracking_table(Config, NodeIndex, + fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1, + Username). + +exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + Tab = TableNameFun(Node), + AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + mnesia, + dirty_all_keys, [Tab]), + lists:member(Key, AllKeys). + +mimic_vhost_down(Config, NodeIndex, VHost) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + rabbit_vhost, vhost_down, [VHost]). + +all_connections(Config) -> + all_connections(Config, 0). +all_connections(Config, NodeIndex) -> + all_tracked_items(Config, NodeIndex, rabbit_connection_tracking). + +all_channels(Config) -> + all_channels(Config, 0). +all_channels(Config, NodeIndex) -> + all_tracked_items(Config, NodeIndex, rabbit_channel_tracking). + +all_tracked_items(Config, NodeIndex, TrackingMod) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + TrackingMod, + list, []). + +set_up_vhost(Config, VHost) -> + rabbit_ct_broker_helpers:add_vhost(Config, VHost), + rabbit_ct_broker_helpers:set_full_permissions(Config, <<"guest">>, VHost), + set_vhost_connection_limit(Config, VHost, -1). + +set_vhost_connection_limit(Config, VHost, Count) -> + set_vhost_connection_limit(Config, 0, VHost, Count). + +set_vhost_connection_limit(Config, NodeIndex, VHost, Count) -> + Node = rabbit_ct_broker_helpers:get_node_config( + Config, NodeIndex, nodename), + ok = rabbit_ct_broker_helpers:control_action( + set_vhost_limits, Node, + ["{\"max-connections\": " ++ integer_to_list(Count) ++ "}"], + [{"-p", binary_to_list(VHost)}]). + +set_tracking_execution_timeout(Config, Timeout) -> + set_tracking_execution_timeout(Config, 0, Timeout). +set_tracking_execution_timeout(Config, NodeIndex, Timeout) -> + rabbit_ct_broker_helpers:rpc(Config, NodeIndex, + application, set_env, + [rabbit, tracking_execution_timeout, Timeout]). + +get_tracking_execution_timeout(Config) -> + get_tracking_execution_timeout(Config, 0). +get_tracking_execution_timeout(Config, NodeIndex) -> + {ok, Timeout} = rabbit_ct_broker_helpers:rpc( + Config, NodeIndex, + application, get_env, + [rabbit, tracking_execution_timeout]), + Timeout. + +await_running_node_refresh(_Config, _NodeIndex) -> + timer:sleep(250). + +expect_that_client_connection_is_rejected(Config) -> + expect_that_client_connection_is_rejected(Config, 0). + +expect_that_client_connection_is_rejected(Config, NodeIndex) -> + {error, not_allowed} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex). + +expect_that_client_connection_is_rejected(Config, NodeIndex, VHost) -> + {error, not_allowed} = + rabbit_ct_client_helpers:open_unmanaged_connection(Config, NodeIndex, VHost). diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index f3e37b0a0c..ef9e5c4554 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -137,7 +137,7 @@ clear_all_connection_tracking_tables(Config) -> [rabbit_ct_broker_helpers:rpc(Config, N, rabbit_connection_tracking, - clear_tracked_connection_tables_for_this_node, + clear_tracking_tables, []) || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename)]. %% ------------------------------------------------------------------- @@ -692,7 +692,7 @@ count_connections_in(Config, VHost, NodeIndex) -> timer:sleep(200), rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_connection_tracking, - count_connections_in, [VHost]). + count_tracked_items_in, [{vhost, VHost}]). connections_in(Config, VHost) -> connections_in(Config, 0, VHost). diff --git a/test/per_vhost_connection_limit_partitions_SUITE.erl b/test/per_vhost_connection_limit_partitions_SUITE.erl index e4c6864ea0..2748d95592 100644 --- a/test/per_vhost_connection_limit_partitions_SUITE.erl +++ b/test/per_vhost_connection_limit_partitions_SUITE.erl @@ -140,7 +140,7 @@ count_connections_in(Config, VHost) -> count_connections_in(Config, VHost, NodeIndex) -> rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_connection_tracking, - count_connections_in, [VHost]). + count_tracked_items_in, [{vhost, VHost}]). connections_in(Config, VHost) -> connections_in(Config, 0, VHost). @@ -148,22 +148,3 @@ connections_in(Config, NodeIndex, VHost) -> rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_connection_tracking, list, [VHost]). - -connections_on_node(Config) -> - connections_on_node(Config, 0). -connections_on_node(Config, NodeIndex) -> - Node = rabbit_ct_broker_helpers:get_node_config(Config, NodeIndex, nodename), - rabbit_ct_broker_helpers:rpc(Config, NodeIndex, - rabbit_connection_tracking, - list_on_node, [Node]). -connections_on_node(Config, NodeIndex, NodeForListing) -> - rabbit_ct_broker_helpers:rpc(Config, NodeIndex, - rabbit_connection_tracking, - list_on_node, [NodeForListing]). - -all_connections(Config) -> - all_connections(Config, 0). -all_connections(Config, NodeIndex) -> - rabbit_ct_broker_helpers:rpc(Config, NodeIndex, - rabbit_connection_tracking, - list, []). diff --git a/test/unit_access_control_SUITE.erl b/test/unit_access_control_SUITE.erl index fcfd9e2bde..af8f481083 100644 --- a/test/unit_access_control_SUITE.erl +++ b/test/unit_access_control_SUITE.erl @@ -94,23 +94,17 @@ password_hashing1(_Config) -> rabbit_password_hashing_md5 = rabbit_auth_backend_internal:hashing_module_for_user( - #internal_user{}), + internal_user:new()), rabbit_password_hashing_md5 = rabbit_auth_backend_internal:hashing_module_for_user( - #internal_user{ - hashing_algorithm = undefined - }), + internal_user:new({hashing_algorithm, undefined})), rabbit_password_hashing_md5 = rabbit_auth_backend_internal:hashing_module_for_user( - #internal_user{ - hashing_algorithm = rabbit_password_hashing_md5 - }), + internal_user:new({hashing_algorithm, rabbit_password_hashing_md5})), rabbit_password_hashing_sha256 = rabbit_auth_backend_internal:hashing_module_for_user( - #internal_user{ - hashing_algorithm = rabbit_password_hashing_sha256 - }), + internal_user:new({hashing_algorithm, rabbit_password_hashing_sha256})), passed. @@ -211,23 +205,20 @@ set_tags_for_passwordless_user1(_Config) -> ok = rabbit_auth_backend_internal:set_tags(Username, [management], <<"acting-user">>), - ?assertMatch( - {ok, #internal_user{tags = [management]}}, - rabbit_auth_backend_internal:lookup_user(Username)), + {ok, User1} = rabbit_auth_backend_internal:lookup_user(Username), + ?assertEqual([management], internal_user:get_tags(User1)), ok = rabbit_auth_backend_internal:set_tags(Username, [management, policymaker], <<"acting-user">>), - ?assertMatch( - {ok, #internal_user{tags = [management, policymaker]}}, - rabbit_auth_backend_internal:lookup_user(Username)), + {ok, User2} = rabbit_auth_backend_internal:lookup_user(Username), + ?assertEqual([management, policymaker], internal_user:get_tags(User2)), ok = rabbit_auth_backend_internal:set_tags(Username, [], <<"acting-user">>), - ?assertMatch( - {ok, #internal_user{tags = []}}, - rabbit_auth_backend_internal:lookup_user(Username)), + {ok, User3} = rabbit_auth_backend_internal:lookup_user(Username), + ?assertEqual([], internal_user:get_tags(User3)), ok = rabbit_auth_backend_internal:delete_user(Username, <<"acting-user">>), diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index 5906dfecb1..4e6ffe0d74 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -351,7 +351,7 @@ count_connections_in(Config, VHost, NodeIndex) -> timer:sleep(200), rabbit_ct_broker_helpers:rpc(Config, NodeIndex, rabbit_connection_tracking, - count_connections_in, [VHost]). + count_tracked_items_in, [{vhost, VHost}]). set_up_vhost(Config, VHost) -> rabbit_ct_broker_helpers:add_vhost(Config, VHost), |
