summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAyanda-D <ayanda.dube@erlang-solutions.com>2020-06-10 10:10:48 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-09-02 04:28:58 +0300
commit3ea173725611662a666c95c336c3f3d0d34ff74a (patch)
treee7510513f46580965a1e5365efcea456a8d2555f /src
parent275cecce2f9f429a77190712ad3f45afcbc31459 (diff)
downloadrabbitmq-server-git-3ea173725611662a666c95c336c3f3d0d34ff74a.tar.gz
Introduce per-user channel tracking
Make 'tracking_execution_timeout' configurable Add per_user_connection_channel_tracking_SUITE
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl5
-rw-r--r--src/rabbit_channel_tracking.erl298
-rw-r--r--src/rabbit_channel_tracking_handler.erl80
-rw-r--r--src/rabbit_connection_tracking.erl5
4 files changed, 386 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f58ae1db94..f057fb0da3 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -199,6 +199,11 @@
{mfa, {rabbit_connection_tracking, boot, []}},
{enables, routing_ready}]}).
+-rabbit_boot_step({channel_tracking,
+ [{description, "channel tracking infrastructure"},
+ {mfa, {rabbit_channel_tracking, boot, []}},
+ {enables, routing_ready}]}).
+
-rabbit_boot_step({background_gc,
[{description, "background garbage collection"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl
new file mode 100644
index 0000000000..aac3b92e1b
--- /dev/null
+++ b/src/rabbit_channel_tracking.erl
@@ -0,0 +1,298 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_channel_tracking).
+
+%% Abstracts away how tracked connection records are stored
+%% and queried.
+%%
+%% See also:
+%%
+%% * rabbit_channel_tracking_handler
+%% * rabbit_reader
+%% * rabbit_event
+-behaviour(rabbit_tracking).
+
+-export([boot/0,
+ update_tracked/1,
+ handle_cast/1,
+ register_tracked/1,
+ unregister_tracked/1,
+ count_tracked_items_in/1,
+ clear_tracking_tables/0,
+ shutdown_tracked_items/2]).
+
+-export([list/0, list_of_user/1, list_on_node/1,
+ tracked_channel_table_name_for/1,
+ tracked_channel_per_user_table_name_for/1,
+ get_all_tracked_channel_table_names_for_node/1,
+ delete_tracked_channel_user_entry/1]).
+
+-include_lib("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+%%
+%% API
+%%
+
+%% Sets up and resets channel tracking tables for this node.
+-spec boot() -> ok.
+
+boot() ->
+ ensure_tracked_channels_table_for_this_node(),
+ rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
+ [tracked_channel_table_name_for(node())]),
+ ensure_per_user_tracked_channels_table_for_node(),
+ rabbit_log:info("Setting up a table for channel tracking on this node: ~p",
+ [tracked_channel_per_user_table_name_for(node())]),
+ clear_tracking_tables(),
+ ok.
+
+-spec update_tracked(term()) -> ok.
+
+update_tracked(Event) ->
+ spawn(?MODULE, handle_cast, [Event]).
+
+%% Asynchronously handle update events
+-spec handle_cast(term()) -> ok.
+
+handle_cast({channel_created, Details}) ->
+ ThisNode = node(),
+ case node(pget(pid, Details)) of
+ ThisNode ->
+ TrackedCh = #tracked_channel{id = TrackedChId} =
+ tracked_channel_from_channel_created_event(Details),
+ try
+ register_tracked(TrackedCh)
+ catch
+ error:{no_exists, _} ->
+ Msg = "Could not register channel ~p for tracking, "
+ "its table is not ready yet or the channel terminated prematurely",
+ rabbit_log_connection:warning(Msg, [TrackedChId]),
+ ok;
+ error:Err ->
+ Msg = "Could not register channel ~p for tracking: ~p",
+ rabbit_log_connection:warning(Msg, [TrackedChId, Err]),
+ ok
+ end;
+ _OtherNode ->
+ %% ignore
+ ok
+ end;
+handle_cast({channel_closed, Details}) ->
+ %% channel has terminated, unregister iff local
+ case get_tracked_channel_by_pid(pget(pid, Details)) of
+ [#tracked_channel{name = Name}] ->
+ unregister_tracked(rabbit_tracking:id(node(), Name));
+ _Other -> ok
+ end;
+handle_cast({connection_closed, ConnDetails}) ->
+ ThisNode= node(),
+ ConnPid = pget(pid, ConnDetails),
+
+ case pget(node, ConnDetails) of
+ ThisNode ->
+ TrackedChs = get_tracked_channels_by_connection_pid(ConnPid),
+ rabbit_log_connection:info(
+ "Closing all channels from connection '~p' "
+ "because it has been closed", [pget(name, ConnDetails)]),
+ shutdown_tracked_items(TrackedChs, undefined),
+ [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) ||
+ #tracked_channel{name = Name} <- TrackedChs],
+ ok;
+ _DifferentNode ->
+ ok
+ end;
+handle_cast({user_deleted, Details}) ->
+ Username = pget(name, Details),
+ %% Schedule user entry deletion, allowing time for connections to close
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ delete_tracked_channel_user_entry, [Username]),
+ ok;
+handle_cast({node_deleted, Details}) ->
+ Node = pget(node, Details),
+ rabbit_log_connection:info(
+ "Node '~s' was removed from the cluster, deleting"
+ " its channel tracking tables...", [Node]),
+ delete_tracked_channels_table_for_node(Node),
+ delete_per_user_tracked_channels_table_for_node(Node).
+
+-spec register_tracked(rabbit_types:tracked_channel()) -> ok.
+-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]).
+
+register_tracked(TrackedCh =
+ #tracked_channel{node = Node, name = Name, username = Username}) ->
+ ChId = rabbit_tracking:id(Node, Name),
+ TableName = tracked_channel_table_name_for(Node),
+ PerUserChTableName = tracked_channel_per_user_table_name_for(Node),
+ %% upsert
+ case mnesia:dirty_read(TableName, ChId) of
+ [] ->
+ mnesia:dirty_write(TableName, TrackedCh),
+ mnesia:dirty_update_counter(PerUserChTableName, Username, 1);
+ [#tracked_channel{}] ->
+ ok
+ end,
+ ok.
+
+-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok.
+
+unregister_tracked(ChId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_channel_table_name_for(Node),
+ PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node),
+ case mnesia:dirty_read(TableName, ChId) of
+ [] -> ok;
+ [#tracked_channel{username = Username}] ->
+ mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1),
+ mnesia:dirty_delete(TableName, ChId)
+ end.
+
+-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer().
+
+count_tracked_items_in({user, Username}) ->
+ rabbit_tracking:count_tracked_items(
+ fun tracked_channel_per_user_table_name_for/1,
+ #tracked_channel_per_user.channel_count, Username,
+ "channels in vhost").
+
+-spec clear_tracking_tables() -> ok.
+
+clear_tracking_tables() ->
+ clear_tracked_channel_tables_for_this_node().
+
+-spec shutdown_tracked_items(list(), term()) -> ok.
+
+shutdown_tracked_items(TrackedItems, _Args) ->
+ close_channels(TrackedItems).
+
+%% helper functions
+-spec list() -> [rabbit_types:tracked_channel()].
+
+list() ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = tracked_channel_table_name_for(Node),
+ Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
+ end, [], rabbit_mnesia:cluster_nodes(running)).
+
+-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
+
+list_of_user(Username) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{username = Username, _ = '_'}).
+
+-spec list_on_node(node()) -> [rabbit_types:tracked_channel()].
+
+list_on_node(Node) ->
+ try mnesia:dirty_match_object(
+ tracked_channel_table_name_for(Node),
+ #tracked_channel{_ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+-spec tracked_channel_table_name_for(node()) -> atom().
+
+tracked_channel_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])).
+
+-spec tracked_channel_per_user_table_name_for(node()) -> atom().
+
+tracked_channel_per_user_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format(
+ "tracked_channel_table_per_user_on_node_~s", [Node])).
+
+%% internal
+ensure_tracked_channels_table_for_this_node() ->
+ ensure_tracked_channels_table_for_node(node()).
+
+ensure_per_user_tracked_channels_table_for_node() ->
+ ensure_per_user_tracked_channels_table_for_node(node()).
+
+%% Create tables
+ensure_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_channel},
+ {attributes, record_info(fields, tracked_channel)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+ensure_per_user_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_per_user_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user},
+ {attributes, record_info(fields, tracked_channel_per_user)}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]),
+ ok
+ end.
+
+clear_tracked_channel_tables_for_this_node() ->
+ [rabbit_tracking:clear_tracking_table(T)
+ || T <- get_all_tracked_channel_table_names_for_node(node())].
+
+delete_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel").
+
+delete_per_user_tracked_channels_table_for_node(Node) ->
+ TableName = tracked_channel_per_user_table_name_for(Node),
+ rabbit_tracking:delete_tracking_table(TableName, Node,
+ "per-user tracked channels").
+
+get_all_tracked_channel_table_names_for_node(Node) ->
+ [tracked_channel_table_name_for(Node),
+ tracked_channel_per_user_table_name_for(Node)].
+
+get_tracked_channels_by_connection_pid(ConnPid) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{connection = ConnPid, _ = '_'}).
+
+get_tracked_channel_by_pid(ChPid) ->
+ rabbit_tracking:match_tracked_items(
+ fun tracked_channel_table_name_for/1,
+ #tracked_channel{pid = ChPid, _ = '_'}).
+
+delete_tracked_channel_user_entry(Username) ->
+ rabbit_tracking:delete_tracked_entry(
+ {rabbit_auth_backend_internal, exists, [Username]},
+ fun tracked_channel_per_user_table_name_for/1,
+ Username).
+
+tracked_channel_from_channel_created_event(ChannelDetails) ->
+ Node = node(ChPid = pget(pid, ChannelDetails)),
+ Name = pget(name, ChannelDetails),
+ #tracked_channel{
+ id = rabbit_tracking:id(Node, Name),
+ name = Name,
+ node = Node,
+ vhost = pget(vhost, ChannelDetails),
+ pid = ChPid,
+ connection = pget(connection, ChannelDetails),
+ username = pget(user, ChannelDetails)}.
+
+close_channels(TrackedChannels = [#tracked_channel{}|_]) ->
+ [rabbit_channel:shutdown(ChPid)
+ || #tracked_channel{pid = ChPid} <- TrackedChannels],
+ ok;
+close_channels(_TrackedChannels = []) -> ok.
diff --git a/src/rabbit_channel_tracking_handler.erl b/src/rabbit_channel_tracking_handler.erl
new file mode 100644
index 0000000000..b2556b8742
--- /dev/null
+++ b/src/rabbit_channel_tracking_handler.erl
@@ -0,0 +1,80 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_channel_tracking_handler).
+
+%% This module keeps track of channel creation and termination events
+%% on its local node. Similar to the rabbit_connection_tracking_handler,
+%% the primary goal here is to decouple channel tracking from rabbit_reader
+%% and isolate channel tracking to its own process to avoid blocking connection
+%% creation events. Additionaly, creation events are also non-blocking in that
+%% they spawn a short-live process for updating the tracking tables in realtime.
+%%
+%% Events from other nodes are ignored.
+
+-behaviour(gen_event).
+
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include_lib("rabbit.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "channel tracking event handler"},
+ {mfa, {gen_event, add_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {cleanup, {gen_event, delete_handler,
+ [rabbit_event, ?MODULE, []]}},
+ {requires, [channel_tracking]},
+ {enables, recovery}]}).
+
+%%
+%% API
+%%
+
+init([]) ->
+ {ok, []}.
+
+handle_event(#event{type = channel_created, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({channel_created, Details}),
+ {ok, State};
+handle_event(#event{type = channel_closed, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({channel_closed, Details}),
+ {ok, State};
+handle_event(#event{type = connection_closed, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({connection_closed, Details}),
+ {ok, State};
+handle_event(#event{type = user_deleted, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({user_deleted, Details}),
+ {ok, State};
+%% A node had been deleted from the cluster.
+handle_event(#event{type = node_deleted, props = Details}, State) ->
+ _Pid = rabbit_channel_tracking:update_tracked({node_deleted, Details}),
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, not_understood, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index c9647acbe4..85ce317bba 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -37,6 +37,7 @@
tracked_connection_table_name_for/1,
tracked_connection_per_vhost_table_name_for/1,
tracked_connection_per_user_table_name_for/1,
+ get_all_tracked_connection_table_names_for_node/1,
delete_tracked_connections_table_for_node/1,
delete_per_vhost_tracked_connections_table_for_node/1,
@@ -128,7 +129,7 @@ handle_cast({connection_closed, Details}) ->
handle_cast({vhost_deleted, Details}) ->
VHost = pget(name, Details),
%% Schedule vhost entry deletion, allowing time for connections to close
- _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_connection_vhost_entry, [VHost]),
rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
shutdown_tracked_items(
@@ -149,7 +150,7 @@ handle_cast({vhost_down, Details}) ->
handle_cast({user_deleted, Details}) ->
Username = pget(name, Details),
%% Schedule user entry deletion, allowing time for connections to close
- _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE,
+ _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE,
delete_tracked_connection_user_entry, [Username]),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
shutdown_tracked_items(