summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2016-09-12 15:15:54 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2016-09-12 15:15:54 +0100
commit9c577158c17653f31ba9925dc1338ee36a4ffd58 (patch)
treea128dba3d51934607a9369aa49a06dfa5a5b7d09
parentf887ff1a4b403a17040627fea1aa901ef23dbe30 (diff)
downloadrabbitmq-server-git-9c577158c17653f31ba9925dc1338ee36a4ffd58.tar.gz
Support direct connection tracking
-rw-r--r--src/rabbit_connection_tracking.erl2
-rw-r--r--src/rabbit_connection_tracking_handler.erl13
-rw-r--r--src/rabbit_direct.erl73
-rw-r--r--test/per_vhost_connection_limit_SUITE.erl82
4 files changed, 114 insertions, 56 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index ab945abc2f..b1d34ca516 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -315,6 +315,7 @@ tracked_connection_from_connection_created(EventDetails) ->
username = pget(user, EventDetails),
connected_at = pget(connected_at, EventDetails),
pid = pget(pid, EventDetails),
+ type = pget(type, EventDetails),
peer_host = pget(peer_host, EventDetails),
peer_port = pget(peer_port, EventDetails)}.
@@ -333,5 +334,6 @@ tracked_connection_from_connection_state(#connection{
{user, Username},
{connected_at, Ts},
{pid, self()},
+ {type, network},
{peer_port, PeerPort},
{peer_host, PeerHost}]).
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index fd1df8c88a..598fe686c3 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -77,9 +77,9 @@ handle_event(#event{type = connection_closed, props = Details}, State) ->
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
VHost = pget(name, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) ||
- #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)],
- {ok, State};
+ [close_connection(Conn, rabbit_misc:format("vhost '~s' is deleted", [VHost]))
+ || Conn <- rabbit_connection_tracking:list(VHost)],
+ {ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
_Username = pget(name, Details),
%% TODO: force close and unregister connections from
@@ -106,3 +106,10 @@ terminate(_Arg, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
+ rabbit_networking:close_connection(Pid, Message);
+close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
+ %% Do an RPC call to the node running the direct client.
+ Node = node(Pid),
+ rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 061105c150..858681ecfd 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -65,35 +65,64 @@ list() ->
%%----------------------------------------------------------------------------
-connect({none, _}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end,
- VHost, Protocol, Pid, Infos);
+auth_fun({none, _}, _VHost) ->
+ fun () -> {ok, rabbit_auth_backend_dummy:user()} end;
-connect({Username, none}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end,
- VHost, Protocol, Pid, Infos);
+auth_fun({Username, none}, _VHost) ->
+ fun () -> rabbit_access_control:check_user_login(Username, []) end;
-connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
- connect0(fun () -> rabbit_access_control:check_user_login(
- Username, [{password, Password}, {vhost, VHost}]) end,
- VHost, Protocol, Pid, Infos).
+auth_fun({Username, Password}, VHost) ->
+ fun () ->
+ rabbit_access_control:check_user_login(
+ Username,
+ [{password, Password}, {vhost, VHost}])
+ end.
-connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
+connect(Creds, VHost, Protocol, Pid, Infos) ->
+ AuthFun = auth_fun(Creds, VHost),
case rabbit:is_running() of
- true -> case AuthFun() of
- {ok, User = #user{username = Username}} ->
- notify_auth_result(Username,
- user_authentication_success, []),
- connect1(User, VHost, Protocol, Pid, Infos);
- {refused, Username, Msg, Args} ->
- notify_auth_result(Username,
- user_authentication_failure,
- [{error, rabbit_misc:format(Msg, Args)}]),
- {error, {auth_failure, "Refused"}}
- end;
+ true ->
+ case is_over_connection_limit(VHost, Creds, Pid) of
+ true ->
+ {error, not_allowed};
+ false ->
+ case AuthFun() of
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
+ connect1(User, VHost, Protocol, Pid, Infos);
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
+ {error, {auth_failure, "Refused"}}
+ end
+ end;
false -> {error, broker_not_found_on_node}
end.
+is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
+ PrintedUsername = case Username of
+ none -> "";
+ _ -> Username
+ end,
+ try rabbit_connection_tracking:is_over_connection_limit(VHost) of
+ false -> false;
+ {true, Limit} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "access to vhost '~s' refused for user '~s': "
+ "connection limit (~p) is reached",
+ [Pid, VHost, PrintedUsername, Limit]),
+ true
+ catch
+ throw:{error, {no_such_vhost, VHost}} ->
+ rabbit_log_connection:error(
+ "Error on Direct connection ~p~n"
+ "vhost ~s not found", [Pid, VHost]),
+ true
+ end.
+
notify_auth_result(Username, AuthResult, ExtraProps) ->
EventProps = [{connection_type, direct},
{name, case Username of none -> ''; _ -> Username end}] ++
diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl
index 4fae129a35..592e57c41a 100644
--- a/test/per_vhost_connection_limit_SUITE.erl
+++ b/test/per_vhost_connection_limit_SUITE.erl
@@ -24,35 +24,41 @@
all() ->
[
- {group, cluster_size_1},
- {group, cluster_size_2}
+ {group, cluster_size_1_network},
+ {group, cluster_size_2_network},
+ {group, cluster_size_1_direct},
+ {group, cluster_size_2_direct}
].
groups() ->
+ ClusterSize1Tests = [
+ most_basic_single_node_connection_count,
+ single_node_single_vhost_connection_count,
+ single_node_multiple_vhosts_connection_count,
+ single_node_list_in_vhost,
+ single_node_single_vhost_limit,
+ single_node_single_vhost_zero_limit,
+ single_node_multiple_vhosts_limit,
+ single_node_multiple_vhosts_zero_limit,
+ single_node_vhost_deletion_forces_connection_closure
+ ],
+ ClusterSize2Tests = [
+ most_basic_cluster_connection_count,
+ cluster_single_vhost_connection_count,
+ cluster_multiple_vhosts_connection_count,
+ cluster_node_restart_connection_count,
+ cluster_node_list_on_node,
+ cluster_single_vhost_limit,
+ cluster_single_vhost_limit2,
+ cluster_single_vhost_zero_limit,
+ cluster_multiple_vhosts_zero_limit,
+ cluster_vhost_deletion_forces_connection_closure
+ ],
[
- {cluster_size_1, [], [
- most_basic_single_node_connection_count,
- single_node_single_vhost_connection_count,
- single_node_multiple_vhosts_connection_count,
- single_node_list_in_vhost,
- single_node_single_vhost_limit,
- single_node_single_vhost_zero_limit,
- single_node_multiple_vhosts_limit,
- single_node_multiple_vhosts_zero_limit,
- single_node_vhost_deletion_forces_connection_closure
- ]},
- {cluster_size_2, [], [
- most_basic_cluster_connection_count,
- cluster_single_vhost_connection_count,
- cluster_multiple_vhosts_connection_count,
- cluster_node_restart_connection_count,
- cluster_node_list_on_node,
- cluster_single_vhost_limit,
- cluster_single_vhost_limit2,
- cluster_single_vhost_zero_limit,
- cluster_multiple_vhosts_zero_limit,
- cluster_vhost_deletion_forces_connection_closure
- ]},
+ {cluster_size_1_network, [], ClusterSize1Tests},
+ {cluster_size_2_network, [], ClusterSize2Tests},
+ {cluster_size_1_direct, [], ClusterSize1Tests},
+ {cluster_size_2_direct, [], ClusterSize2Tests},
{cluster_rename, [], [
vhost_limit_after_node_renamed
]}
@@ -80,10 +86,19 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
-init_per_group(cluster_size_1, Config) ->
- init_per_multinode_group(cluster_size_1, Config, 1);
-init_per_group(cluster_size_2, Config) ->
- init_per_multinode_group(cluster_size_2, Config, 2);
+init_per_group(cluster_size_1_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_1_network, Config1, 1);
+init_per_group(cluster_size_2_network, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]),
+ init_per_multinode_group(cluster_size_2_network, Config1, 2);
+init_per_group(cluster_size_1_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_1_direct, Config1, 1);
+init_per_group(cluster_size_2_direct, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]),
+ init_per_multinode_group(cluster_size_2_direct, Config1, 2);
+
init_per_group(cluster_rename, Config) ->
init_per_multinode_group(cluster_rename, Config, 2).
@@ -706,12 +721,17 @@ vhost_limit_after_node_renamed(Config) ->
%% -------------------------------------------------------------------
open_connections(Config, NodesAndVHosts) ->
+ % Randomly select connection type
+ OpenConnectionFun = case ?config(connection_type, Config) of
+ network -> open_unmanaged_connection;
+ direct -> open_unmanaged_connection_direct
+ end,
Conns = lists:map(fun
({Node, VHost}) ->
- rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node,
VHost);
(Node) ->
- rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node)
+ rabbit_ct_client_helpers:OpenConnectionFun(Config, Node)
end, NodesAndVHosts),
timer:sleep(500),
Conns.