summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda-D <ayanda.dube@erlang-solutions.com>2020-06-04 10:59:52 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-08-24 20:14:28 +0300
commitdba05644b9be347db78ba48c82375264982f92e6 (patch)
treef23edaabd3251c8e79a0742fa646ff0838b84696
parent716d293e0dfc727700dfb93be64081c5cbf6dd5a (diff)
downloadrabbitmq-server-git-dba05644b9be347db78ba48c82375264982f92e6.tar.gz
Introduce per-user connection tracking
-rw-r--r--src/rabbit_connection_tracking.erl341
-rw-r--r--src/rabbit_connection_tracking_handler.erl21
2 files changed, 194 insertions, 168 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index 9bfe36b925..c9647acbe4 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -7,8 +7,6 @@
-module(rabbit_connection_tracking).
--behaviour(gen_server).
-
%% Abstracts away how tracked connection records are stored
%% and queried.
%%
@@ -17,20 +15,40 @@
%% * rabbit_connection_tracking_handler
%% * rabbit_reader
%% * rabbit_event
+-behaviour(rabbit_tracking).
-export([boot/0,
- ensure_tracked_connections_table_for_node/1,
+ update_tracked/1,
+ handle_cast/1,
+ register_tracked/1,
+ unregister_tracked/1,
+ count_tracked_items_in/1,
+ clear_tracking_tables/0,
+ shutdown_tracked_items/2]).
+
+-export([ensure_tracked_connections_table_for_node/1,
ensure_per_vhost_tracked_connections_table_for_node/1,
+ ensure_per_user_tracked_connections_table_for_node/1,
+
ensure_tracked_connections_table_for_this_node/0,
ensure_per_vhost_tracked_connections_table_for_this_node/0,
- tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1,
- delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
+ ensure_per_user_tracked_connections_table_for_this_node/0,
+
+ tracked_connection_table_name_for/1,
+ tracked_connection_per_vhost_table_name_for/1,
+ tracked_connection_per_user_table_name_for/1,
+
+ delete_tracked_connections_table_for_node/1,
+ delete_per_vhost_tracked_connections_table_for_node/1,
+ delete_per_user_tracked_connections_table_for_node/1,
+ delete_tracked_connection_user_entry/1,
+ delete_tracked_connection_vhost_entry/1,
+
clear_tracked_connection_tables_for_this_node/0,
- register_connection/1, unregister_connection/1,
+
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
- count_connections_in/1,
lookup/1,
count/0]).
@@ -38,36 +56,47 @@
-import(rabbit_misc, [pget/2]).
--export([start_link/0]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
-export([close_connections/3]).
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
%%
-%% GenServer API
+%% API
%%
-init([]) ->
- {ok, nostate}.
+%% Behaviour callbacks
+
+-spec boot() -> ok.
+
+%% Sets up and resets connection tracking tables for this
+%% node.
+boot() ->
+ ensure_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for connection tracking on this node: ~p",
+ [tracked_connection_table_name_for(node())]),
+ ensure_per_vhost_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p",
+ [tracked_connection_per_vhost_table_name_for(node())]),
+ ensure_per_user_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Setting up a table for per-user connection counting on this node: ~p",
+ [tracked_connection_per_user_table_name_for(node())]),
+ clear_tracking_tables(),
+ ok.
-handle_call(_Msg, _From, nostate) ->
- {reply, ok, nostate}.
+-spec update_tracked(term()) -> ok.
+update_tracked(Event) ->
+ spawn(?MODULE, handle_cast, [Event]).
-handle_cast({connection_created, Details}, nostate) ->
+%% Asynchronously handle update events
+-spec handle_cast(term()) -> ok.
+
+handle_cast({connection_created, Details}) ->
ThisNode = node(),
case pget(node, Details) of
ThisNode ->
TConn = tracked_connection_from_connection_created(Details),
ConnId = TConn#tracked_connection.id,
try
- register_connection(TConn)
+ register_tracked(TConn)
catch
error:{no_exists, _} ->
Msg = "Could not register connection ~p for tracking, "
@@ -82,84 +111,114 @@ handle_cast({connection_created, Details}, nostate) ->
_OtherNode ->
%% ignore
ok
- end,
- {noreply, nostate};
-handle_cast({connection_closed, Details}, nostate) ->
+ end;
+handle_cast({connection_closed, Details}) ->
ThisNode = node(),
case pget(node, Details) of
ThisNode ->
%% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
%% {pid,<0.1774.0>},
%% {node, rabbit@hostname}]
- unregister_connection(
- {pget(node, Details),
- pget(name, Details)});
+ unregister_tracked(
+ rabbit_tracking:id(ThisNode, pget(name, Details)));
_OtherNode ->
%% ignore
ok
- end,
- {noreply, nostate};
-handle_cast({vhost_deleted, Details}, nostate) ->
+ end;
+handle_cast({vhost_deleted, Details}) ->
VHost = pget(name, Details),
+ %% Schedule vhost entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_connection_vhost_entry, [VHost]),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- close_connections(rabbit_connection_tracking:list(VHost),
- rabbit_misc:format("vhost '~s' is deleted", [VHost])),
- {noreply, nostate};
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list(VHost),
+ rabbit_misc:format("vhost '~s' is deleted", [VHost]));
%% Note: under normal circumstances this will be called immediately
%% after the vhost_deleted above. Therefore we should be careful about
%% what we log and be more defensive.
-handle_cast({vhost_down, Details}, nostate) ->
+handle_cast({vhost_down, Details}) ->
VHost = pget(name, Details),
Node = pget(node, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
" because the vhost is stopping",
[VHost, Node]),
- close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
- rabbit_misc:format("vhost '~s' is down", [VHost])),
- {noreply, nostate};
-handle_cast({user_deleted, Details}, nostate) ->
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list_on_node(Node, VHost),
+ rabbit_misc:format("vhost '~s' is down", [VHost]));
+handle_cast({user_deleted, Details}) ->
Username = pget(name, Details),
+ %% Schedule user entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_connection_user_entry, [Username]),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
- close_connections(rabbit_connection_tracking:list_of_user(Username),
- rabbit_misc:format("user '~s' is deleted", [Username])),
- {noreply, nostate};
+ shutdown_tracked_items(
+ rabbit_connection_tracking:list_of_user(Username),
+ rabbit_misc:format("user '~s' is deleted", [Username]));
%% A node had been deleted from the cluster.
-handle_cast({node_deleted, Details}, nostate) ->
+handle_cast({node_deleted, Details}) ->
Node = pget(node, Details),
rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]),
- rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node),
- rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node),
- {noreply, nostate};
-handle_cast(_Msg, nostate) ->
- {noreply, nostate}.
+ delete_tracked_connections_table_for_node(Node),
+ delete_per_vhost_tracked_connections_table_for_node(Node),
+ delete_per_user_tracked_connections_table_for_node(Node).
-handle_info(_Info, nostate) ->
- {noreply, nostate}.
+-spec register_tracked(rabbit_types:tracked_connection()) -> ok.
+-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]).
-terminate(_Reason, nostate) ->
+register_tracked(#tracked_connection{username = Username, vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
+ %% upsert
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] ->
+ mnesia:dirty_write(TableName, Conn),
+ mnesia:dirty_update_counter(PerVhostTableName, VHost, 1),
+ mnesia:dirty_update_counter(PerUserConnTableName, Username, 1);
+ [#tracked_connection{}] ->
+ ok
+ end,
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+-spec unregister_tracked(rabbit_types:tracked_connection_id()) -> ok.
-%%
-%% API
-%%
+unregister_tracked(ConnId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
+ PerUserConnTableName = tracked_connection_per_user_table_name_for(Node),
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] -> ok;
+ [#tracked_connection{vhost = VHost, username = Username}] ->
+ mnesia:dirty_update_counter(PerUserConnTableName, Username, -1),
+ mnesia:dirty_update_counter(PerVhostTableName, VHost, -1),
+ mnesia:dirty_delete(TableName, ConnId)
+ end.
--spec boot() -> ok.
+-spec count_tracked_items_in({atom(), rabbit_types:vhost()}) -> non_neg_integer().
-%% Sets up and resets connection tracking tables for this
-%% node.
-boot() ->
- ensure_tracked_connections_table_for_this_node(),
- rabbit_log:info("Setting up a table for connection tracking on this node: ~p",
- [tracked_connection_table_name_for(node())]),
- ensure_per_vhost_tracked_connections_table_for_this_node(),
- rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p",
- [tracked_connection_per_vhost_table_name_for(node())]),
- clear_tracked_connection_tables_for_this_node(),
- ok.
+count_tracked_items_in({vhost, VirtualHost}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_connection_per_vhost_table_name_for/1,
+ #tracked_connection_per_vhost.connection_count, VirtualHost,
+ "connections in vhost");
+count_tracked_items_in({user, Username}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_connection_per_user_table_name_for/1,
+ #tracked_connection_per_user.connection_count, Username,
+ "connections for user").
+
+-spec clear_tracking_tables() -> ok.
+
+clear_tracking_tables() ->
+ clear_tracked_connection_tables_for_this_node().
+
+-spec shutdown_tracked_items(list(), term()) -> ok.
+
+shutdown_tracked_items(TrackedItems, Message) ->
+ close_connections(TrackedItems, Message).
+%% Extended API
-spec ensure_tracked_connections_table_for_this_node() -> ok.
@@ -173,6 +232,13 @@ ensure_per_vhost_tracked_connections_table_for_this_node() ->
ensure_per_vhost_tracked_connections_table_for_node(node()).
+-spec ensure_per_user_tracked_connections_table_for_this_node() -> ok.
+
+ensure_per_user_tracked_connections_table_for_this_node() ->
+ ensure_per_user_tracked_connections_table_for_node(node()).
+
+
+%% Create tables
-spec ensure_tracked_connections_table_for_node(node()) -> ok.
ensure_tracked_connections_table_for_node(Node) ->
@@ -186,7 +252,6 @@ ensure_tracked_connections_table_for_node(Node) ->
ok
end.
-
-spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok.
ensure_per_vhost_tracked_connections_table_for_node(Node) ->
@@ -200,45 +265,44 @@ ensure_per_vhost_tracked_connections_table_for_node(Node) ->
ok
end.
+-spec ensure_per_user_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_per_user_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_user_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection_per_user},
+ {attributes, record_info(fields, tracked_connection_per_user)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a per-user tracked connection table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
-spec clear_tracked_connection_tables_for_this_node() -> ok.
clear_tracked_connection_tables_for_this_node() ->
- case mnesia:clear_table(tracked_connection_table_name_for(node())) of
- {atomic, ok} -> ok;
- {aborted, _} -> ok
- end,
- case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of
- {atomic, ok} -> ok;
- {aborted, _} -> ok
- end.
-
+ [rabbit_tracking:clear_tracking_table(T)
+ || T <- get_all_tracked_connection_table_names_for_node(node())].
-spec delete_tracked_connections_table_for_node(node()) -> ok.
delete_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_table_name_for(Node),
- case mnesia:delete_table(TableName) of
- {atomic, ok} -> ok;
- {aborted, {no_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to delete a tracked connection table for node ~p: ~p", [Node, Error]),
- ok
- end.
-
+ rabbit_tracking:delete_tracking_table(TableName, Node, "tracked connection").
-spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok.
delete_per_vhost_tracked_connections_table_for_node(Node) ->
TableName = tracked_connection_per_vhost_table_name_for(Node),
- case mnesia:delete_table(TableName) of
- {atomic, ok} -> ok;
- {aborted, {no_exists, _}} -> ok;
- {aborted, Error} ->
- rabbit_log:error("Failed to delete a per-vhost tracked connection table for node ~p: ~p", [Node, Error]),
- ok
- end.
+ rabbit_tracking:delete_tracked_table(TableName, Node,
+ "per-vhost tracked connection").
+
+-spec delete_per_user_tracked_connections_table_for_node(node()) -> ok.
+delete_per_user_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_user_table_name_for(Node),
+ rabbit_tracking:delete_tracked_table(TableName, Node,
+ "per-user tracked connection").
-spec tracked_connection_table_name_for(node()) -> atom().
@@ -250,36 +314,18 @@ tracked_connection_table_name_for(Node) ->
tracked_connection_per_vhost_table_name_for(Node) ->
list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])).
+-spec tracked_connection_per_user_table_name_for(node()) -> atom().
--spec register_connection(rabbit_types:tracked_connection()) -> ok.
--dialyzer([{nowarn_function, [register_connection/1]}, race_conditions]).
+tracked_connection_per_user_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format(
+ "tracked_connection_table_per_user_on_node_~s", [Node])).
-register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
- TableName = tracked_connection_table_name_for(Node),
- PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
- %% upsert
- case mnesia:dirty_read(TableName, ConnId) of
- [] ->
- mnesia:dirty_write(TableName, Conn),
- mnesia:dirty_update_counter(
- PerVhostTableName, VHost, 1);
- [_Row] ->
- ok
- end,
- ok.
-
--spec unregister_connection(rabbit_types:connection_name()) -> ok.
+-spec get_all_tracked_connection_table_names_for_node(node()) -> [atom()].
-unregister_connection(ConnId = {Node, _Name}) when Node =:= node() ->
- TableName = tracked_connection_table_name_for(Node),
- PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
- case mnesia:dirty_read(TableName, ConnId) of
- [] -> ok;
- [Row] ->
- mnesia:dirty_update_counter(
- PerVhostTableName, Row#tracked_connection.vhost, -1),
- mnesia:dirty_delete(TableName, ConnId)
- end.
+get_all_tracked_connection_table_names_for_node(Node) ->
+ [tracked_connection_table_name_for(Node),
+ tracked_connection_per_vhost_table_name_for(Node),
+ tracked_connection_per_user_table_name_for(Node)].
-spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'.
@@ -317,12 +363,9 @@ count() ->
-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
list(VHost) ->
- lists:foldl(
- fun (Node, Acc) ->
- Tab = tracked_connection_table_name_for(Node),
- Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'})
- end, [], rabbit_mnesia:cluster_nodes(running)).
-
+ rabbit_tracking:match_tracked_items(
+ fun tracked_connection_table_name_for/1,
+ #tracked_connection{vhost = VHost, _ = '_'}).
-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
@@ -346,32 +389,23 @@ list_on_node(Node, VHost) ->
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
list_of_user(Username) ->
- lists:foldl(
- fun (Node, Acc) ->
- Tab = tracked_connection_table_name_for(Node),
- Acc ++ mnesia:dirty_match_object(
- Tab,
- #tracked_connection{username = Username, _ = '_'})
- end, [], rabbit_mnesia:cluster_nodes(running)).
+ rabbit_tracking:match_tracked_items(
+ fun tracked_connection_table_name_for/1,
+ #tracked_connection{username = Username, _ = '_'}).
--spec count_connections_in(rabbit_types:vhost()) -> non_neg_integer().
-
-count_connections_in(VirtualHost) ->
- lists:foldl(fun (Node, Acc) ->
- Tab = tracked_connection_per_vhost_table_name_for(Node),
- try
- N = case mnesia:dirty_read(Tab, VirtualHost) of
- [] -> 0;
- [Val] -> Val#tracked_connection_per_vhost.connection_count
- end,
- Acc + N
- catch _:Err ->
- rabbit_log:error(
- "Failed to fetch number of connections in vhost ~p on node ~p:~n~p~n",
- [VirtualHost, Err, Node]),
- Acc
- end
- end, 0, rabbit_mnesia:cluster_nodes(running)).
+%% Internal, delete tracked entries
+
+delete_tracked_connection_vhost_entry(Vhost) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_vhost, exists, [Vhost]},
+ fun tracked_connection_per_vhost_table_name_for/1,
+ Vhost).
+
+delete_tracked_connection_user_entry(Username) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_auth_backend_internal, exists, [Username]},
+ fun tracked_connection_per_user_table_name_for/1,
+ Username).
%% Returns a #tracked_connection from connection_created
%% event details.
@@ -419,7 +453,7 @@ tracked_connection_from_connection_created(EventDetails) ->
%% {connected_at,1453214290847}]
Name = pget(name, EventDetails),
Node = pget(node, EventDetails),
- #tracked_connection{id = {Node, Name},
+ #tracked_connection{id = rabbit_tracking:id(Node, Name),
name = Name,
node = Node,
vhost = pget(vhost, EventDetails),
@@ -476,4 +510,3 @@ close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
%% Do an RPC call to the node running the direct client.
Node = node(Pid),
rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).
-
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index c7917e0487..a75defbcef 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -30,16 +30,9 @@
[rabbit_event, ?MODULE, []]}},
{cleanup, {gen_event, delete_handler,
[rabbit_event, ?MODULE, []]}},
- {requires, [rabbit_connection_tracking]},
+ {requires, [connection_tracking]},
{enables, recovery}]}).
--rabbit_boot_step({rabbit_connection_tracking,
- [{description, "statistics event manager"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_connection_tracking]}},
- {requires, [rabbit_event, rabbit_node_monitor]},
- {enables, ?MODULE}]}).
-
%%
%% API
%%
@@ -48,26 +41,26 @@ init([]) ->
{ok, []}.
handle_event(#event{type = connection_created, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {connection_created, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({connection_created, Details}),
{ok, State};
handle_event(#event{type = connection_closed, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {connection_closed, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({connection_closed, Details}),
{ok, State};
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {vhost_deleted, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({vhost_deleted, Details}),
{ok, State};
%% Note: under normal circumstances this will be called immediately
%% after the vhost_deleted above. Therefore we should be careful about
%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {vhost_down, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({vhost_down, Details}),
{ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {user_deleted, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({user_deleted, Details}),
{ok, State};
%% A node had been deleted from the cluster.
handle_event(#event{type = node_deleted, props = Details}, State) ->
- gen_server:cast(rabbit_connection_tracking, {node_deleted, Details}),
+ _Pid = rabbit_connection_tracking:update_tracked({node_deleted, Details}),
{ok, State};
handle_event(_Event, State) ->
{ok, State}.