diff options
| author | Ayanda-D <ayanda.dube@erlang-solutions.com> | 2020-06-04 10:59:52 +0100 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-08-24 20:14:28 +0300 |
| commit | dba05644b9be347db78ba48c82375264982f92e6 (patch) | |
| tree | f23edaabd3251c8e79a0742fa646ff0838b84696 | |
| parent | 716d293e0dfc727700dfb93be64081c5cbf6dd5a (diff) | |
| download | rabbitmq-server-git-dba05644b9be347db78ba48c82375264982f92e6.tar.gz | |
Introduce per-user connection tracking
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 341 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 21 |
2 files changed, 194 insertions, 168 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index 9bfe36b925..c9647acbe4 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -7,8 +7,6 @@ -module(rabbit_connection_tracking). --behaviour(gen_server). - %% Abstracts away how tracked connection records are stored %% and queried. %% @@ -17,20 +15,40 @@ %% * rabbit_connection_tracking_handler %% * rabbit_reader %% * rabbit_event +-behaviour(rabbit_tracking). -export([boot/0, - ensure_tracked_connections_table_for_node/1, + update_tracked/1, + handle_cast/1, + register_tracked/1, + unregister_tracked/1, + count_tracked_items_in/1, + clear_tracking_tables/0, + shutdown_tracked_items/2]). + +-export([ensure_tracked_connections_table_for_node/1, ensure_per_vhost_tracked_connections_table_for_node/1, + ensure_per_user_tracked_connections_table_for_node/1, + ensure_tracked_connections_table_for_this_node/0, ensure_per_vhost_tracked_connections_table_for_this_node/0, - tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1, - delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, + ensure_per_user_tracked_connections_table_for_this_node/0, + + tracked_connection_table_name_for/1, + tracked_connection_per_vhost_table_name_for/1, + tracked_connection_per_user_table_name_for/1, + + delete_tracked_connections_table_for_node/1, + delete_per_vhost_tracked_connections_table_for_node/1, + delete_per_user_tracked_connections_table_for_node/1, + delete_tracked_connection_user_entry/1, + delete_tracked_connection_vhost_entry/1, + clear_tracked_connection_tables_for_this_node/0, - register_connection/1, unregister_connection/1, + list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, - count_connections_in/1, lookup/1, count/0]). @@ -38,36 +56,47 @@ -import(rabbit_misc, [pget/2]). --export([start_link/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - -export([close_connections/3]). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - %% -%% GenServer API +%% API %% -init([]) -> - {ok, nostate}. +%% Behaviour callbacks + +-spec boot() -> ok. + +%% Sets up and resets connection tracking tables for this +%% node. +boot() -> + ensure_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for connection tracking on this node: ~p", + [tracked_connection_table_name_for(node())]), + ensure_per_vhost_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p", + [tracked_connection_per_vhost_table_name_for(node())]), + ensure_per_user_tracked_connections_table_for_this_node(), + rabbit_log:info("Setting up a table for per-user connection counting on this node: ~p", + [tracked_connection_per_user_table_name_for(node())]), + clear_tracking_tables(), + ok. -handle_call(_Msg, _From, nostate) -> - {reply, ok, nostate}. +-spec update_tracked(term()) -> ok. +update_tracked(Event) -> + spawn(?MODULE, handle_cast, [Event]). -handle_cast({connection_created, Details}, nostate) -> +%% Asynchronously handle update events +-spec handle_cast(term()) -> ok. + +handle_cast({connection_created, Details}) -> ThisNode = node(), case pget(node, Details) of ThisNode -> TConn = tracked_connection_from_connection_created(Details), ConnId = TConn#tracked_connection.id, try - register_connection(TConn) + register_tracked(TConn) catch error:{no_exists, _} -> Msg = "Could not register connection ~p for tracking, " @@ -82,84 +111,114 @@ handle_cast({connection_created, Details}, nostate) -> _OtherNode -> %% ignore ok - end, - {noreply, nostate}; -handle_cast({connection_closed, Details}, nostate) -> + end; +handle_cast({connection_closed, Details}) -> ThisNode = node(), case pget(node, Details) of ThisNode -> %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, %% {pid,<0.1774.0>}, %% {node, rabbit@hostname}] - unregister_connection( - {pget(node, Details), - pget(name, Details)}); + unregister_tracked( + rabbit_tracking:id(ThisNode, pget(name, Details))); _OtherNode -> %% ignore ok - end, - {noreply, nostate}; -handle_cast({vhost_deleted, Details}, nostate) -> + end; +handle_cast({vhost_deleted, Details}) -> VHost = pget(name, Details), + %% Schedule vhost entry deletion, allowing time for connections to close + _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE, + delete_tracked_connection_vhost_entry, [VHost]), rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), - close_connections(rabbit_connection_tracking:list(VHost), - rabbit_misc:format("vhost '~s' is deleted", [VHost])), - {noreply, nostate}; + shutdown_tracked_items( + rabbit_connection_tracking:list(VHost), + rabbit_misc:format("vhost '~s' is deleted", [VHost])); %% Note: under normal circumstances this will be called immediately %% after the vhost_deleted above. Therefore we should be careful about %% what we log and be more defensive. -handle_cast({vhost_down, Details}, nostate) -> +handle_cast({vhost_down, Details}) -> VHost = pget(name, Details), Node = pget(node, Details), rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'" " because the vhost is stopping", [VHost, Node]), - close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), - rabbit_misc:format("vhost '~s' is down", [VHost])), - {noreply, nostate}; -handle_cast({user_deleted, Details}, nostate) -> + shutdown_tracked_items( + rabbit_connection_tracking:list_on_node(Node, VHost), + rabbit_misc:format("vhost '~s' is down", [VHost])); +handle_cast({user_deleted, Details}) -> Username = pget(name, Details), + %% Schedule user entry deletion, allowing time for connections to close + _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE, + delete_tracked_connection_user_entry, [Username]), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), - close_connections(rabbit_connection_tracking:list_of_user(Username), - rabbit_misc:format("user '~s' is deleted", [Username])), - {noreply, nostate}; + shutdown_tracked_items( + rabbit_connection_tracking:list_of_user(Username), + rabbit_misc:format("user '~s' is deleted", [Username])); %% A node had been deleted from the cluster. -handle_cast({node_deleted, Details}, nostate) -> +handle_cast({node_deleted, Details}) -> Node = pget(node, Details), rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]), - rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node), - rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node), - {noreply, nostate}; -handle_cast(_Msg, nostate) -> - {noreply, nostate}. + delete_tracked_connections_table_for_node(Node), + delete_per_vhost_tracked_connections_table_for_node(Node), + delete_per_user_tracked_connections_table_for_node(Node). -handle_info(_Info, nostate) -> - {noreply, nostate}. +-spec register_tracked(rabbit_types:tracked_connection()) -> ok. +-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]). -terminate(_Reason, nostate) -> +register_tracked(#tracked_connection{username = Username, vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() -> + TableName = tracked_connection_table_name_for(Node), + PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), + PerUserConnTableName = tracked_connection_per_user_table_name_for(Node), + %% upsert + case mnesia:dirty_read(TableName, ConnId) of + [] -> + mnesia:dirty_write(TableName, Conn), + mnesia:dirty_update_counter(PerVhostTableName, VHost, 1), + mnesia:dirty_update_counter(PerUserConnTableName, Username, 1); + [#tracked_connection{}] -> + ok + end, ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +-spec unregister_tracked(rabbit_types:tracked_connection_id()) -> ok. -%% -%% API -%% +unregister_tracked(ConnId = {Node, _Name}) when Node =:= node() -> + TableName = tracked_connection_table_name_for(Node), + PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), + PerUserConnTableName = tracked_connection_per_user_table_name_for(Node), + case mnesia:dirty_read(TableName, ConnId) of + [] -> ok; + [#tracked_connection{vhost = VHost, username = Username}] -> + mnesia:dirty_update_counter(PerUserConnTableName, Username, -1), + mnesia:dirty_update_counter(PerVhostTableName, VHost, -1), + mnesia:dirty_delete(TableName, ConnId) + end. --spec boot() -> ok. +-spec count_tracked_items_in({atom(), rabbit_types:vhost()}) -> non_neg_integer(). -%% Sets up and resets connection tracking tables for this -%% node. -boot() -> - ensure_tracked_connections_table_for_this_node(), - rabbit_log:info("Setting up a table for connection tracking on this node: ~p", - [tracked_connection_table_name_for(node())]), - ensure_per_vhost_tracked_connections_table_for_this_node(), - rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p", - [tracked_connection_per_vhost_table_name_for(node())]), - clear_tracked_connection_tables_for_this_node(), - ok. +count_tracked_items_in({vhost, VirtualHost}) -> + rabbit_tracking:count_tracked_items( + fun tracked_connection_per_vhost_table_name_for/1, + #tracked_connection_per_vhost.connection_count, VirtualHost, + "connections in vhost"); +count_tracked_items_in({user, Username}) -> + rabbit_tracking:count_tracked_items( + fun tracked_connection_per_user_table_name_for/1, + #tracked_connection_per_user.connection_count, Username, + "connections for user"). + +-spec clear_tracking_tables() -> ok. + +clear_tracking_tables() -> + clear_tracked_connection_tables_for_this_node(). + +-spec shutdown_tracked_items(list(), term()) -> ok. + +shutdown_tracked_items(TrackedItems, Message) -> + close_connections(TrackedItems, Message). +%% Extended API -spec ensure_tracked_connections_table_for_this_node() -> ok. @@ -173,6 +232,13 @@ ensure_per_vhost_tracked_connections_table_for_this_node() -> ensure_per_vhost_tracked_connections_table_for_node(node()). +-spec ensure_per_user_tracked_connections_table_for_this_node() -> ok. + +ensure_per_user_tracked_connections_table_for_this_node() -> + ensure_per_user_tracked_connections_table_for_node(node()). + + +%% Create tables -spec ensure_tracked_connections_table_for_node(node()) -> ok. ensure_tracked_connections_table_for_node(Node) -> @@ -186,7 +252,6 @@ ensure_tracked_connections_table_for_node(Node) -> ok end. - -spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok. ensure_per_vhost_tracked_connections_table_for_node(Node) -> @@ -200,45 +265,44 @@ ensure_per_vhost_tracked_connections_table_for_node(Node) -> ok end. +-spec ensure_per_user_tracked_connections_table_for_node(node()) -> ok. + +ensure_per_user_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_per_user_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_connection_per_user}, + {attributes, record_info(fields, tracked_connection_per_user)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a per-user tracked connection table for node ~p: ~p", [Node, Error]), + ok + end. -spec clear_tracked_connection_tables_for_this_node() -> ok. clear_tracked_connection_tables_for_this_node() -> - case mnesia:clear_table(tracked_connection_table_name_for(node())) of - {atomic, ok} -> ok; - {aborted, _} -> ok - end, - case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of - {atomic, ok} -> ok; - {aborted, _} -> ok - end. - + [rabbit_tracking:clear_tracking_table(T) + || T <- get_all_tracked_connection_table_names_for_node(node())]. -spec delete_tracked_connections_table_for_node(node()) -> ok. delete_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_table_name_for(Node), - case mnesia:delete_table(TableName) of - {atomic, ok} -> ok; - {aborted, {no_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to delete a tracked connection table for node ~p: ~p", [Node, Error]), - ok - end. - + rabbit_tracking:delete_tracking_table(TableName, Node, "tracked connection"). -spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok. delete_per_vhost_tracked_connections_table_for_node(Node) -> TableName = tracked_connection_per_vhost_table_name_for(Node), - case mnesia:delete_table(TableName) of - {atomic, ok} -> ok; - {aborted, {no_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to delete a per-vhost tracked connection table for node ~p: ~p", [Node, Error]), - ok - end. + rabbit_tracking:delete_tracked_table(TableName, Node, + "per-vhost tracked connection"). + +-spec delete_per_user_tracked_connections_table_for_node(node()) -> ok. +delete_per_user_tracked_connections_table_for_node(Node) -> + TableName = tracked_connection_per_user_table_name_for(Node), + rabbit_tracking:delete_tracked_table(TableName, Node, + "per-user tracked connection"). -spec tracked_connection_table_name_for(node()) -> atom(). @@ -250,36 +314,18 @@ tracked_connection_table_name_for(Node) -> tracked_connection_per_vhost_table_name_for(Node) -> list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])). +-spec tracked_connection_per_user_table_name_for(node()) -> atom(). --spec register_connection(rabbit_types:tracked_connection()) -> ok. --dialyzer([{nowarn_function, [register_connection/1]}, race_conditions]). +tracked_connection_per_user_table_name_for(Node) -> + list_to_atom(rabbit_misc:format( + "tracked_connection_table_per_user_on_node_~s", [Node])). -register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() -> - TableName = tracked_connection_table_name_for(Node), - PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), - %% upsert - case mnesia:dirty_read(TableName, ConnId) of - [] -> - mnesia:dirty_write(TableName, Conn), - mnesia:dirty_update_counter( - PerVhostTableName, VHost, 1); - [_Row] -> - ok - end, - ok. - --spec unregister_connection(rabbit_types:connection_name()) -> ok. +-spec get_all_tracked_connection_table_names_for_node(node()) -> [atom()]. -unregister_connection(ConnId = {Node, _Name}) when Node =:= node() -> - TableName = tracked_connection_table_name_for(Node), - PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), - case mnesia:dirty_read(TableName, ConnId) of - [] -> ok; - [Row] -> - mnesia:dirty_update_counter( - PerVhostTableName, Row#tracked_connection.vhost, -1), - mnesia:dirty_delete(TableName, ConnId) - end. +get_all_tracked_connection_table_names_for_node(Node) -> + [tracked_connection_table_name_for(Node), + tracked_connection_per_vhost_table_name_for(Node), + tracked_connection_per_user_table_name_for(Node)]. -spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'. @@ -317,12 +363,9 @@ count() -> -spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. list(VHost) -> - lists:foldl( - fun (Node, Acc) -> - Tab = tracked_connection_table_name_for(Node), - Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'}) - end, [], rabbit_mnesia:cluster_nodes(running)). - + rabbit_tracking:match_tracked_items( + fun tracked_connection_table_name_for/1, + #tracked_connection{vhost = VHost, _ = '_'}). -spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. @@ -346,32 +389,23 @@ list_on_node(Node, VHost) -> -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()]. list_of_user(Username) -> - lists:foldl( - fun (Node, Acc) -> - Tab = tracked_connection_table_name_for(Node), - Acc ++ mnesia:dirty_match_object( - Tab, - #tracked_connection{username = Username, _ = '_'}) - end, [], rabbit_mnesia:cluster_nodes(running)). + rabbit_tracking:match_tracked_items( + fun tracked_connection_table_name_for/1, + #tracked_connection{username = Username, _ = '_'}). --spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer(). - -count_connections_in(VirtualHost) -> - lists:foldl(fun (Node, Acc) -> - Tab = tracked_connection_per_vhost_table_name_for(Node), - try - N = case mnesia:dirty_read(Tab, VirtualHost) of - [] -> 0; - [Val] -> Val#tracked_connection_per_vhost.connection_count - end, - Acc + N - catch _:Err -> - rabbit_log:error( - "Failed to fetch number of connections in vhost ~p on node ~p:~n~p~n", - [VirtualHost, Err, Node]), - Acc - end - end, 0, rabbit_mnesia:cluster_nodes(running)). +%% Internal, delete tracked entries + +delete_tracked_connection_vhost_entry(Vhost) -> + rabbit_tracking:delete_tracked_entry( + {rabbit_vhost, exists, [Vhost]}, + fun tracked_connection_per_vhost_table_name_for/1, + Vhost). + +delete_tracked_connection_user_entry(Username) -> + rabbit_tracking:delete_tracked_entry( + {rabbit_auth_backend_internal, exists, [Username]}, + fun tracked_connection_per_user_table_name_for/1, + Username). %% Returns a #tracked_connection from connection_created %% event details. @@ -419,7 +453,7 @@ tracked_connection_from_connection_created(EventDetails) -> %% {connected_at,1453214290847}] Name = pget(name, EventDetails), Node = pget(node, EventDetails), - #tracked_connection{id = {Node, Name}, + #tracked_connection{id = rabbit_tracking:id(Node, Name), name = Name, node = Node, vhost = pget(vhost, EventDetails), @@ -476,4 +510,3 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> %% Do an RPC call to the node running the direct client. Node = node(Pid), rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]). - diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index c7917e0487..a75defbcef 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -30,16 +30,9 @@ [rabbit_event, ?MODULE, []]}}, {cleanup, {gen_event, delete_handler, [rabbit_event, ?MODULE, []]}}, - {requires, [rabbit_connection_tracking]}, + {requires, [connection_tracking]}, {enables, recovery}]}). --rabbit_boot_step({rabbit_connection_tracking, - [{description, "statistics event manager"}, - {mfa, {rabbit_sup, start_restartable_child, - [rabbit_connection_tracking]}}, - {requires, [rabbit_event, rabbit_node_monitor]}, - {enables, ?MODULE}]}). - %% %% API %% @@ -48,26 +41,26 @@ init([]) -> {ok, []}. handle_event(#event{type = connection_created, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {connection_created, Details}), + _Pid = rabbit_connection_tracking:update_tracked({connection_created, Details}), {ok, State}; handle_event(#event{type = connection_closed, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {connection_closed, Details}), + _Pid = rabbit_connection_tracking:update_tracked({connection_closed, Details}), {ok, State}; handle_event(#event{type = vhost_deleted, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {vhost_deleted, Details}), + _Pid = rabbit_connection_tracking:update_tracked({vhost_deleted, Details}), {ok, State}; %% Note: under normal circumstances this will be called immediately %% after the vhost_deleted above. Therefore we should be careful about %% what we log and be more defensive. handle_event(#event{type = vhost_down, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {vhost_down, Details}), + _Pid = rabbit_connection_tracking:update_tracked({vhost_down, Details}), {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {user_deleted, Details}), + _Pid = rabbit_connection_tracking:update_tracked({user_deleted, Details}), {ok, State}; %% A node had been deleted from the cluster. handle_event(#event{type = node_deleted, props = Details}, State) -> - gen_server:cast(rabbit_connection_tracking, {node_deleted, Details}), + _Pid = rabbit_connection_tracking:update_tracked({node_deleted, Details}), {ok, State}; handle_event(_Event, State) -> {ok, State}. |
