summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnupama Singh <anupamasingh31@gmail.com>2020-06-08 16:38:19 +0200
committerMichael Klishin <michael@clojurewerkz.org>2020-09-02 04:28:58 +0300
commitfaa63c3089bb8ddf2643f0c824638a0fc2c9d90e (patch)
tree7bf4b4031ae536f4d71c2493110837567334e22e
parent6ee55529459d9f17af1d47e56b7e11cfdcaa8c5f (diff)
downloadrabbitmq-server-git-faa63c3089bb8ddf2643f0c824638a0fc2c9d90e.tar.gz
User connection/channel limit for direct connections
-rw-r--r--src/rabbit_auth_backend_internal.erl14
-rw-r--r--src/rabbit_direct.erl58
2 files changed, 46 insertions, 26 deletions
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 12f71d9e97..14f315d1e4 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -780,14 +780,14 @@ set_user_limits(Username, Defn) ->
end.
validate_parameters_and_update_limit(Username, Term) ->
- case rabbit_parameter_validation:proplist(<<"user-limits">>,
- user_limit_validation(), Term) of
- [ok] ->
+ case flatten_errors(rabbit_parameter_validation:proplist(
+ <<"user-limits">>, user_limit_validation(), Term)) of
+ ok ->
update_user(Username, fun(User = #internal_user{limits = Limits}) ->
User#internal_user{
limits = maps:merge(Limits, Term)}
end);
- [{error, Reason, Arguments}] ->
+ {errors, [{Reason, Arguments}]} ->
{error_string, rabbit_misc:format(Reason, Arguments)}
end.
@@ -801,6 +801,12 @@ clear_user_limits(Username, LimitType) ->
limits = maps:remove(LimitType, Limits)}
end).
+flatten_errors(L) ->
+ case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
+ [] -> ok;
+ E -> {errors, E}
+ end.
+
%%----------------------------------------------------------------------------
%% Listing
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 070dc7bfee..98eef21104 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -176,20 +176,27 @@ notify_auth_result(Username, AuthResult, ExtraProps) ->
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
-connect1(User, VHost, Protocol, Pid, Infos) ->
- % Note: peer_host can be either a tuple or
- % a binary if reverse_dns_lookups is enabled
- PeerHost = proplists:get_value(peer_host, Infos),
- AuthzContext = proplists:get_value(variable_map, Infos, #{}),
- try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}, AuthzContext) of
- ok -> ok = pg_local:join(rabbit_direct, Pid),
- rabbit_core_metrics:connection_created(Pid, Infos),
- rabbit_event:notify(connection_created, Infos),
- {ok, {User, rabbit_reader:server_properties(Protocol)}}
- catch
- exit:#amqp_error{name = Reason = not_allowed} ->
- {error, Reason}
- end.
+connect1(User = #user{username = Username}, VHost, Protocol, Pid, Infos) ->
+ case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
+ false ->
+ % Note: peer_host can be either a tuple or
+ % a binary if reverse_dns_lookups is enabled
+ PeerHost = proplists:get_value(peer_host, Infos),
+ AuthzContext = proplists:get_value(variable_map, Infos, #{}),
+ try rabbit_access_control:check_vhost_access(User, VHost,
+ {ip, PeerHost}, AuthzContext) of
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_core_metrics:connection_created(Pid, Infos),
+ rabbit_event:notify(connection_created, Infos),
+ {ok, {User, rabbit_reader:server_properties(Protocol)}}
+ catch
+ exit:#amqp_error{name = Reason = not_allowed} ->
+ {error, Reason}
+ end;
+ {true, Limit} ->
+ {error, rabbit_misc:format("Connection refused for user ~s. User "
+ "connection limit (~p) is reached", [Username, Limit])}
+ end.
-spec start_channel
(rabbit_channel:channel_number(), pid(), pid(), string(),
@@ -197,14 +204,21 @@ connect1(User, VHost, Protocol, Pid, Infos) ->
rabbit_framing:amqp_table(), pid(), any()) ->
{'ok', pid()}.
-start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, Collector, AmqpParams) ->
- {ok, _, {ChannelPid, _}} =
- supervisor2:start_child(
- rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, Collector, AmqpParams}]),
- {ok, ChannelPid}.
+start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol,
+ User = #user{username = Username}, VHost, Capabilities,
+ Collector, AmqpParams) ->
+ case rabbit_auth_backend_internal:is_over_channel_limit(Username) of
+ false ->
+ {ok, _, {ChannelPid, _}} =
+ supervisor2:start_child(
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, ConnPid, ConnName, Protocol,
+ User, VHost, Capabilities, Collector, AmqpParams}]),
+ {ok, ChannelPid};
+ {true, Limit} -> {error, rabbit_misc:format("Not allowed. Number of "
+ "channels opened for user (~w) has reached the "
+ "maximum allowed limit of (~w)", [Username, Limit])}
+ end.
-spec disconnect(pid(), rabbit_event:event_props()) -> 'ok'.