diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-09-12 15:15:54 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-09-12 15:15:54 +0100 |
| commit | 9c577158c17653f31ba9925dc1338ee36a4ffd58 (patch) | |
| tree | a128dba3d51934607a9369aa49a06dfa5a5b7d09 | |
| parent | f887ff1a4b403a17040627fea1aa901ef23dbe30 (diff) | |
| download | rabbitmq-server-git-9c577158c17653f31ba9925dc1338ee36a4ffd58.tar.gz | |
Support direct connection tracking
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 73 | ||||
| -rw-r--r-- | test/per_vhost_connection_limit_SUITE.erl | 82 |
4 files changed, 114 insertions, 56 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index ab945abc2f..b1d34ca516 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -315,6 +315,7 @@ tracked_connection_from_connection_created(EventDetails) -> username = pget(user, EventDetails), connected_at = pget(connected_at, EventDetails), pid = pget(pid, EventDetails), + type = pget(type, EventDetails), peer_host = pget(peer_host, EventDetails), peer_port = pget(peer_port, EventDetails)}. @@ -333,5 +334,6 @@ tracked_connection_from_connection_state(#connection{ {user, Username}, {connected_at, Ts}, {pid, self()}, + {type, network}, {peer_port, PeerPort}, {peer_host, PeerHost}]). diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index fd1df8c88a..598fe686c3 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -77,9 +77,9 @@ handle_event(#event{type = connection_closed, props = Details}, State) -> handle_event(#event{type = vhost_deleted, props = Details}, State) -> VHost = pget(name, Details), rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), - [rabbit_networking:close_connection(Pid, rabbit_misc:format("vhost '~s' is deleted", [VHost])) || - #tracked_connection{pid = Pid} <- rabbit_connection_tracking:list(VHost)], - {ok, State}; + [close_connection(Conn, rabbit_misc:format("vhost '~s' is deleted", [VHost])) + || Conn <- rabbit_connection_tracking:list(VHost)], + {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> _Username = pget(name, Details), %% TODO: force close and unregister connections from @@ -106,3 +106,10 @@ terminate(_Arg, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +close_connection(#tracked_connection{pid = Pid, type = network}, Message) -> + rabbit_networking:close_connection(Pid, Message); +close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> + %% Do an RPC call to the node running the direct client. + Node = node(Pid), + rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 061105c150..858681ecfd 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -65,35 +65,64 @@ list() -> %%---------------------------------------------------------------------------- -connect({none, _}, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> {ok, rabbit_auth_backend_dummy:user()} end, - VHost, Protocol, Pid, Infos); +auth_fun({none, _}, _VHost) -> + fun () -> {ok, rabbit_auth_backend_dummy:user()} end; -connect({Username, none}, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_login(Username, []) end, - VHost, Protocol, Pid, Infos); +auth_fun({Username, none}, _VHost) -> + fun () -> rabbit_access_control:check_user_login(Username, []) end; -connect({Username, Password}, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_login( - Username, [{password, Password}, {vhost, VHost}]) end, - VHost, Protocol, Pid, Infos). +auth_fun({Username, Password}, VHost) -> + fun () -> + rabbit_access_control:check_user_login( + Username, + [{password, Password}, {vhost, VHost}]) + end. -connect0(AuthFun, VHost, Protocol, Pid, Infos) -> +connect(Creds, VHost, Protocol, Pid, Infos) -> + AuthFun = auth_fun(Creds, VHost), case rabbit:is_running() of - true -> case AuthFun() of - {ok, User = #user{username = Username}} -> - notify_auth_result(Username, - user_authentication_success, []), - connect1(User, VHost, Protocol, Pid, Infos); - {refused, Username, Msg, Args} -> - notify_auth_result(Username, - user_authentication_failure, - [{error, rabbit_misc:format(Msg, Args)}]), - {error, {auth_failure, "Refused"}} - end; + true -> + case is_over_connection_limit(VHost, Creds, Pid) of + true -> + {error, not_allowed}; + false -> + case AuthFun() of + {ok, User = #user{username = Username}} -> + notify_auth_result(Username, + user_authentication_success, []), + connect1(User, VHost, Protocol, Pid, Infos); + {refused, Username, Msg, Args} -> + notify_auth_result(Username, + user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}]), + {error, {auth_failure, "Refused"}} + end + end; false -> {error, broker_not_found_on_node} end. +is_over_connection_limit(VHost, {Username, _Password}, Pid) -> + PrintedUsername = case Username of + none -> ""; + _ -> Username + end, + try rabbit_connection_tracking:is_over_connection_limit(VHost) of + false -> false; + {true, Limit} -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "access to vhost '~s' refused for user '~s': " + "connection limit (~p) is reached", + [Pid, VHost, PrintedUsername, Limit]), + true + catch + throw:{error, {no_such_vhost, VHost}} -> + rabbit_log_connection:error( + "Error on Direct connection ~p~n" + "vhost ~s not found", [Pid, VHost]), + true + end. + notify_auth_result(Username, AuthResult, ExtraProps) -> EventProps = [{connection_type, direct}, {name, case Username of none -> ''; _ -> Username end}] ++ diff --git a/test/per_vhost_connection_limit_SUITE.erl b/test/per_vhost_connection_limit_SUITE.erl index 4fae129a35..592e57c41a 100644 --- a/test/per_vhost_connection_limit_SUITE.erl +++ b/test/per_vhost_connection_limit_SUITE.erl @@ -24,35 +24,41 @@ all() -> [ - {group, cluster_size_1}, - {group, cluster_size_2} + {group, cluster_size_1_network}, + {group, cluster_size_2_network}, + {group, cluster_size_1_direct}, + {group, cluster_size_2_direct} ]. groups() -> + ClusterSize1Tests = [ + most_basic_single_node_connection_count, + single_node_single_vhost_connection_count, + single_node_multiple_vhosts_connection_count, + single_node_list_in_vhost, + single_node_single_vhost_limit, + single_node_single_vhost_zero_limit, + single_node_multiple_vhosts_limit, + single_node_multiple_vhosts_zero_limit, + single_node_vhost_deletion_forces_connection_closure + ], + ClusterSize2Tests = [ + most_basic_cluster_connection_count, + cluster_single_vhost_connection_count, + cluster_multiple_vhosts_connection_count, + cluster_node_restart_connection_count, + cluster_node_list_on_node, + cluster_single_vhost_limit, + cluster_single_vhost_limit2, + cluster_single_vhost_zero_limit, + cluster_multiple_vhosts_zero_limit, + cluster_vhost_deletion_forces_connection_closure + ], [ - {cluster_size_1, [], [ - most_basic_single_node_connection_count, - single_node_single_vhost_connection_count, - single_node_multiple_vhosts_connection_count, - single_node_list_in_vhost, - single_node_single_vhost_limit, - single_node_single_vhost_zero_limit, - single_node_multiple_vhosts_limit, - single_node_multiple_vhosts_zero_limit, - single_node_vhost_deletion_forces_connection_closure - ]}, - {cluster_size_2, [], [ - most_basic_cluster_connection_count, - cluster_single_vhost_connection_count, - cluster_multiple_vhosts_connection_count, - cluster_node_restart_connection_count, - cluster_node_list_on_node, - cluster_single_vhost_limit, - cluster_single_vhost_limit2, - cluster_single_vhost_zero_limit, - cluster_multiple_vhosts_zero_limit, - cluster_vhost_deletion_forces_connection_closure - ]}, + {cluster_size_1_network, [], ClusterSize1Tests}, + {cluster_size_2_network, [], ClusterSize2Tests}, + {cluster_size_1_direct, [], ClusterSize1Tests}, + {cluster_size_2_direct, [], ClusterSize2Tests}, {cluster_rename, [], [ vhost_limit_after_node_renamed ]} @@ -80,10 +86,19 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(cluster_size_1, Config) -> - init_per_multinode_group(cluster_size_1, Config, 1); -init_per_group(cluster_size_2, Config) -> - init_per_multinode_group(cluster_size_2, Config, 2); +init_per_group(cluster_size_1_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_1_network, Config1, 1); +init_per_group(cluster_size_2_network, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, network}]), + init_per_multinode_group(cluster_size_2_network, Config1, 2); +init_per_group(cluster_size_1_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_1_direct, Config1, 1); +init_per_group(cluster_size_2_direct, Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, [{connection_type, direct}]), + init_per_multinode_group(cluster_size_2_direct, Config1, 2); + init_per_group(cluster_rename, Config) -> init_per_multinode_group(cluster_rename, Config, 2). @@ -706,12 +721,17 @@ vhost_limit_after_node_renamed(Config) -> %% ------------------------------------------------------------------- open_connections(Config, NodesAndVHosts) -> + % Randomly select connection type + OpenConnectionFun = case ?config(connection_type, Config) of + network -> open_unmanaged_connection; + direct -> open_unmanaged_connection_direct + end, Conns = lists:map(fun ({Node, VHost}) -> - rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node, VHost); (Node) -> - rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node) + rabbit_ct_client_helpers:OpenConnectionFun(Config, Node) end, NodesAndVHosts), timer:sleep(500), Conns. |
