diff options
| author | Anupama Singh <anupamasingh31@gmail.com> | 2020-06-08 16:38:19 +0200 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-09-02 04:28:58 +0300 |
| commit | faa63c3089bb8ddf2643f0c824638a0fc2c9d90e (patch) | |
| tree | 7bf4b4031ae536f4d71c2493110837567334e22e | |
| parent | 6ee55529459d9f17af1d47e56b7e11cfdcaa8c5f (diff) | |
| download | rabbitmq-server-git-faa63c3089bb8ddf2643f0c824638a0fc2c9d90e.tar.gz | |
User connection/channel limit for direct connections
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 58 |
2 files changed, 46 insertions, 26 deletions
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 12f71d9e97..14f315d1e4 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -780,14 +780,14 @@ set_user_limits(Username, Defn) -> end. validate_parameters_and_update_limit(Username, Term) -> - case rabbit_parameter_validation:proplist(<<"user-limits">>, - user_limit_validation(), Term) of - [ok] -> + case flatten_errors(rabbit_parameter_validation:proplist( + <<"user-limits">>, user_limit_validation(), Term)) of + ok -> update_user(Username, fun(User = #internal_user{limits = Limits}) -> User#internal_user{ limits = maps:merge(Limits, Term)} end); - [{error, Reason, Arguments}] -> + {errors, [{Reason, Arguments}]} -> {error_string, rabbit_misc:format(Reason, Arguments)} end. @@ -801,6 +801,12 @@ clear_user_limits(Username, LimitType) -> limits = maps:remove(LimitType, Limits)} end). +flatten_errors(L) -> + case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of + [] -> ok; + E -> {errors, E} + end. + %%---------------------------------------------------------------------------- %% Listing diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 070dc7bfee..98eef21104 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -176,20 +176,27 @@ notify_auth_result(Username, AuthResult, ExtraProps) -> ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). -connect1(User, VHost, Protocol, Pid, Infos) -> - % Note: peer_host can be either a tuple or - % a binary if reverse_dns_lookups is enabled - PeerHost = proplists:get_value(peer_host, Infos), - AuthzContext = proplists:get_value(variable_map, Infos, #{}), - try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}, AuthzContext) of - ok -> ok = pg_local:join(rabbit_direct, Pid), - rabbit_core_metrics:connection_created(Pid, Infos), - rabbit_event:notify(connection_created, Infos), - {ok, {User, rabbit_reader:server_properties(Protocol)}} - catch - exit:#amqp_error{name = Reason = not_allowed} -> - {error, Reason} - end. +connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) -> + case rabbit_auth_backend_internal:is_over_connection_limit(Username) of + false -> + % Note: peer_host can be either a tuple or + % a binary if reverse_dns_lookups is enabled + PeerHost = proplists:get_value(peer_host, Infos), + AuthzContext = proplists:get_value(variable_map, Infos, #{}), + try rabbit_access_control:check_vhost_access(User, VHost, + {ip, PeerHost}, AuthzContext) of + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_core_metrics:connection_created(Pid, Infos), + rabbit_event:notify(connection_created, Infos), + {ok, {User, rabbit_reader:server_properties(Protocol)}} + catch + exit:#amqp_error{name = Reason = not_allowed} -> + {error, Reason} + end; + {true, Limit} -> + {error, rabbit_misc:format("Connection refused for user ~s. User " + "connection limit (~p) is reached", [Username, Limit])} + end. -spec start_channel (rabbit_channel:channel_number(), pid(), pid(), string(), @@ -197,14 +204,21 @@ connect1(User, VHost, Protocol, Pid, Infos) -> rabbit_framing:amqp_table(), pid(), any()) -> {'ok', pid()}. -start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, - VHost, Capabilities, Collector, AmqpParams) -> - {ok, _, {ChannelPid, _}} = - supervisor2:start_child( - rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, Collector, AmqpParams}]), - {ok, ChannelPid}. +start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, + User = #user{username = Username}, VHost, Capabilities, + Collector, AmqpParams) -> + case rabbit_auth_backend_internal:is_over_channel_limit(Username) of + false -> + {ok, _, {ChannelPid, _}} = + supervisor2:start_child( + rabbit_direct_client_sup, + [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol, + User, VHost, Capabilities, Collector, AmqpParams}]), + {ok, ChannelPid}; + {true, Limit} -> {error, rabbit_misc:format("Not allowed. Number of " + "channels opened for user (~w) has reached the " + "maximum allowed limit of (~w)", [Username, Limit])} + end. -spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'. |
