summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-09-12 20:16:58 +0400
committerGitHub <noreply@github.com>2016-09-12 20:16:58 +0400
commitda7f18737edbda4752a3ec589f031e5b1f76e0a6 (patch)
tree0e519ecb14f6043475edf3f7127f243b45926178 /src
parentade8bab28353d691a1418daee4130bacd6830263 (diff)
parent9c577158c17653f31ba9925dc1338ee36a4ffd58 (diff)
downloadrabbitmq-server-git-da7f18737edbda4752a3ec589f031e5b1f76e0a6.tar.gz
Merge pull request #958 from rabbitmq/rabbitmq-server-948
Support direct connection tracking
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_connection_tracking.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl13
-rw-r--r--src/rabbit_direct.erl73
3 files changed, 63 insertions, 25 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index ab945abc2f..b1d34ca516 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -315,6 +315,7 @@ tracked_connection_from_connection_created(EventDetails) ->
username = pget(user, EventDetails),
connected_at = pget(connected_at, EventDetails),
pid = pget(pid, EventDetails),
+ type = pget(type, EventDetails),
peer_host = pget(peer_host, EventDetails),
peer_port = pget(peer_port, EventDetails)}.
@@ -333,5 +334,6 @@ tracked_connection_from_connection_state(#connection{
{user, Username},
{connected_at, Ts},
{pid, self()},
+ {type, network},
{peer_port, PeerPort},
{peer_host, PeerHost}]).
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index fd1df8c88a..598fe686c3 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -77,9 +77,9 @@ handle_event(#event{type = connection_closed, props = Details}, State) ->
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
VHost = pget(name, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) ||
- #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)],
- {ok, State};
+ [close_connection(Conn, rabbit_misc:format("vhost '~s' is deleted", [VHost]))
+ || Conn <- rabbit_connection_tracking:list(VHost)],
+ {ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
_Username = pget(name, Details),
%% TODO: force close and unregister connections from
@@ -106,3 +106,10 @@ terminate(_Arg, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
+ rabbit_networking:close_connection(Pid, Message);
+close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
+ %% Do an RPC call to the node running the direct client.
+ Node = node(Pid),
+ rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 061105c150..858681ecfd 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -65,35 +65,64 @@ list() ->
%%----------------------------------------------------------------------------
-connect({none, _}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end,
- VHost, Protocol, Pid, Infos);
+auth_fun({none, _}, _VHost) ->
+ fun () -> {ok, rabbit_auth_backend_dummy:user()} end;
-connect({Username, none}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end,
- VHost, Protocol, Pid, Infos);
+auth_fun({Username, none}, _VHost) ->
+ fun () -> rabbit_access_control:check_user_login(Username, []) end;
-connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(
- Username, [{password, Password}, {vhost, VHost}]) end,
- VHost, Protocol, Pid, Infos).
+auth_fun({Username, Password}, VHost) ->
+ fun () ->
+ rabbit_access_control:check_user_login(
+ Username,
+ [{password, Password}, {vhost, VHost}])
+ end.
-connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
+connect(Creds, VHost, Protocol, Pid, Infos) ->
+ AuthFun = auth_fun(Creds, VHost),
case rabbit:is_running() of
- true -> case AuthFun() of
- {ok, User = #user{username = Username}} ->
- notify_auth_result(Username,
- user_authentication_success, []),
- connect1(User, VHost, Protocol, Pid, Infos);
- {refused, Username, Msg, Args} ->
- notify_auth_result(Username,
- user_authentication_failure,
- [{error, rabbit_misc:format(Msg, Args)}]),
- {error, {auth_failure, "Refused"}}
- end;
+ true ->
+ case is_over_connection_limit(VHost, Creds, Pid) of
+ true ->
+ {error, not_allowed};
+ false ->
+ case AuthFun() of
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
+ connect1(User, VHost, Protocol, Pid, Infos);
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
+ {error, {auth_failure, "Refused"}}
+ end
+ end;
false -> {error, broker_not_found_on_node}
end.
+is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
+ PrintedUsername = case Username of
+ none -> "";
+ _ -> Username
+ end,
+ try rabbit_connection_tracking:is_over_connection_limit(VHost) of
+ false -> false;
+ {true, Limit} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "access to vhost '~s' refused for user '~s': "
+ "connection limit (~p) is reached",
+ [Pid, VHost, PrintedUsername, Limit]),
+ true
+ catch
+ throw:{error, {no_such_vhost, VHost}} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "vhost ~s not found", [Pid, VHost]),
+ true
+ end.
+
notify_auth_result(Username, AuthResult, ExtraProps) ->
EventProps = [{connection_type, direct},
{name, case Username of none -> ''; _ -> Username end}] ++