diff options
| author | Ayanda-D <ayanda.dube@erlang-solutions.com> | 2020-06-10 10:10:48 +0100 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-09-02 04:28:58 +0300 |
| commit | 3ea173725611662a666c95c336c3f3d0d34ff74a (patch) | |
| tree | e7510513f46580965a1e5365efcea456a8d2555f /src | |
| parent | 275cecce2f9f429a77190712ad3f45afcbc31459 (diff) | |
| download | rabbitmq-server-git-3ea173725611662a666c95c336c3f3d0d34ff74a.tar.gz | |
Introduce per-user channel tracking
Make 'tracking_execution_timeout' configurable
Add per_user_connection_channel_tracking_SUITE
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel_tracking.erl | 298 | ||||
| -rw-r--r-- | src/rabbit_channel_tracking_handler.erl | 80 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 5 |
4 files changed, 386 insertions, 2 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f58ae1db94..f057fb0da3 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -199,6 +199,11 @@ {mfa, {rabbit_connection_tracking, boot, []}}, {enables, routing_ready}]}). +-rabbit_boot_step({channel_tracking, + [{description, "channel tracking infrastructure"}, + {mfa, {rabbit_channel_tracking, boot, []}}, + {enables, routing_ready}]}). + -rabbit_boot_step({background_gc, [{description, "background garbage collection"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_channel_tracking.erl b/src/rabbit_channel_tracking.erl new file mode 100644 index 0000000000..aac3b92e1b --- /dev/null +++ b/src/rabbit_channel_tracking.erl @@ -0,0 +1,298 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_channel_tracking). + +%% Abstracts away how tracked connection records are stored +%% and queried. +%% +%% See also: +%% +%% * rabbit_channel_tracking_handler +%% * rabbit_reader +%% * rabbit_event +-behaviour(rabbit_tracking). + +-export([boot/0, + update_tracked/1, + handle_cast/1, + register_tracked/1, + unregister_tracked/1, + count_tracked_items_in/1, + clear_tracking_tables/0, + shutdown_tracked_items/2]). + +-export([list/0, list_of_user/1, list_on_node/1, + tracked_channel_table_name_for/1, + tracked_channel_per_user_table_name_for/1, + get_all_tracked_channel_table_names_for_node/1, + delete_tracked_channel_user_entry/1]). + +-include_lib("rabbit.hrl"). + +-import(rabbit_misc, [pget/2]). + +%% +%% API +%% + +%% Sets up and resets channel tracking tables for this node. +-spec boot() -> ok. + +boot() -> + ensure_tracked_channels_table_for_this_node(), + rabbit_log:info("Setting up a table for channel tracking on this node: ~p", + [tracked_channel_table_name_for(node())]), + ensure_per_user_tracked_channels_table_for_node(), + rabbit_log:info("Setting up a table for channel tracking on this node: ~p", + [tracked_channel_per_user_table_name_for(node())]), + clear_tracking_tables(), + ok. + +-spec update_tracked(term()) -> ok. + +update_tracked(Event) -> + spawn(?MODULE, handle_cast, [Event]). + +%% Asynchronously handle update events +-spec handle_cast(term()) -> ok. + +handle_cast({channel_created, Details}) -> + ThisNode = node(), + case node(pget(pid, Details)) of + ThisNode -> + TrackedCh = #tracked_channel{id = TrackedChId} = + tracked_channel_from_channel_created_event(Details), + try + register_tracked(TrackedCh) + catch + error:{no_exists, _} -> + Msg = "Could not register channel ~p for tracking, " + "its table is not ready yet or the channel terminated prematurely", + rabbit_log_connection:warning(Msg, [TrackedChId]), + ok; + error:Err -> + Msg = "Could not register channel ~p for tracking: ~p", + rabbit_log_connection:warning(Msg, [TrackedChId, Err]), + ok + end; + _OtherNode -> + %% ignore + ok + end; +handle_cast({channel_closed, Details}) -> + %% channel has terminated, unregister iff local + case get_tracked_channel_by_pid(pget(pid, Details)) of + [#tracked_channel{name = Name}] -> + unregister_tracked(rabbit_tracking:id(node(), Name)); + _Other -> ok + end; +handle_cast({connection_closed, ConnDetails}) -> + ThisNode= node(), + ConnPid = pget(pid, ConnDetails), + + case pget(node, ConnDetails) of + ThisNode -> + TrackedChs = get_tracked_channels_by_connection_pid(ConnPid), + rabbit_log_connection:info( + "Closing all channels from connection '~p' " + "because it has been closed", [pget(name, ConnDetails)]), + shutdown_tracked_items(TrackedChs, undefined), + [unregister_tracked(rabbit_tracking:id(ThisNode, Name)) || + #tracked_channel{name = Name} <- TrackedChs], + ok; + _DifferentNode -> + ok + end; +handle_cast({user_deleted, Details}) -> + Username = pget(name, Details), + %% Schedule user entry deletion, allowing time for connections to close + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, + delete_tracked_channel_user_entry, [Username]), + ok; +handle_cast({node_deleted, Details}) -> + Node = pget(node, Details), + rabbit_log_connection:info( + "Node '~s' was removed from the cluster, deleting" + " its channel tracking tables...", [Node]), + delete_tracked_channels_table_for_node(Node), + delete_per_user_tracked_channels_table_for_node(Node). + +-spec register_tracked(rabbit_types:tracked_channel()) -> ok. +-dialyzer([{nowarn_function, [register_tracked/1]}, race_conditions]). + +register_tracked(TrackedCh = + #tracked_channel{node = Node, name = Name, username = Username}) -> + ChId = rabbit_tracking:id(Node, Name), + TableName = tracked_channel_table_name_for(Node), + PerUserChTableName = tracked_channel_per_user_table_name_for(Node), + %% upsert + case mnesia:dirty_read(TableName, ChId) of + [] -> + mnesia:dirty_write(TableName, TrackedCh), + mnesia:dirty_update_counter(PerUserChTableName, Username, 1); + [#tracked_channel{}] -> + ok + end, + ok. + +-spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok. + +unregister_tracked(ChId = {Node, _Name}) when Node =:= node() -> + TableName = tracked_channel_table_name_for(Node), + PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node), + case mnesia:dirty_read(TableName, ChId) of + [] -> ok; + [#tracked_channel{username = Username}] -> + mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1), + mnesia:dirty_delete(TableName, ChId) + end. + +-spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer(). + +count_tracked_items_in({user, Username}) -> + rabbit_tracking:count_tracked_items( + fun tracked_channel_per_user_table_name_for/1, + #tracked_channel_per_user.channel_count, Username, + "channels in vhost"). + +-spec clear_tracking_tables() -> ok. + +clear_tracking_tables() -> + clear_tracked_channel_tables_for_this_node(). + +-spec shutdown_tracked_items(list(), term()) -> ok. + +shutdown_tracked_items(TrackedItems, _Args) -> + close_channels(TrackedItems). + +%% helper functions +-spec list() -> [rabbit_types:tracked_channel()]. + +list() -> + lists:foldl( + fun (Node, Acc) -> + Tab = tracked_channel_table_name_for(Node), + Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'}) + end, [], rabbit_mnesia:cluster_nodes(running)). + +-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()]. + +list_of_user(Username) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{username = Username, _ = '_'}). + +-spec list_on_node(node()) -> [rabbit_types:tracked_channel()]. + +list_on_node(Node) -> + try mnesia:dirty_match_object( + tracked_channel_table_name_for(Node), + #tracked_channel{_ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + +-spec tracked_channel_table_name_for(node()) -> atom(). + +tracked_channel_table_name_for(Node) -> + list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])). + +-spec tracked_channel_per_user_table_name_for(node()) -> atom(). + +tracked_channel_per_user_table_name_for(Node) -> + list_to_atom(rabbit_misc:format( + "tracked_channel_table_per_user_on_node_~s", [Node])). + +%% internal +ensure_tracked_channels_table_for_this_node() -> + ensure_tracked_channels_table_for_node(node()). + +ensure_per_user_tracked_channels_table_for_node() -> + ensure_per_user_tracked_channels_table_for_node(node()). + +%% Create tables +ensure_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_channel}, + {attributes, record_info(fields, tracked_channel)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]), + ok + end. + +ensure_per_user_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_per_user_table_name_for(Node), + case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user}, + {attributes, record_info(fields, tracked_channel_per_user)}]) of + {atomic, ok} -> ok; + {aborted, {already_exists, _}} -> ok; + {aborted, Error} -> + rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]), + ok + end. + +clear_tracked_channel_tables_for_this_node() -> + [rabbit_tracking:clear_tracking_table(T) + || T <- get_all_tracked_channel_table_names_for_node(node())]. + +delete_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_table_name_for(Node), + rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel"). + +delete_per_user_tracked_channels_table_for_node(Node) -> + TableName = tracked_channel_per_user_table_name_for(Node), + rabbit_tracking:delete_tracking_table(TableName, Node, + "per-user tracked channels"). + +get_all_tracked_channel_table_names_for_node(Node) -> + [tracked_channel_table_name_for(Node), + tracked_channel_per_user_table_name_for(Node)]. + +get_tracked_channels_by_connection_pid(ConnPid) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{connection = ConnPid, _ = '_'}). + +get_tracked_channel_by_pid(ChPid) -> + rabbit_tracking:match_tracked_items( + fun tracked_channel_table_name_for/1, + #tracked_channel{pid = ChPid, _ = '_'}). + +delete_tracked_channel_user_entry(Username) -> + rabbit_tracking:delete_tracked_entry( + {rabbit_auth_backend_internal, exists, [Username]}, + fun tracked_channel_per_user_table_name_for/1, + Username). + +tracked_channel_from_channel_created_event(ChannelDetails) -> + Node = node(ChPid = pget(pid, ChannelDetails)), + Name = pget(name, ChannelDetails), + #tracked_channel{ + id = rabbit_tracking:id(Node, Name), + name = Name, + node = Node, + vhost = pget(vhost, ChannelDetails), + pid = ChPid, + connection = pget(connection, ChannelDetails), + username = pget(user, ChannelDetails)}. + +close_channels(TrackedChannels = [#tracked_channel{}|_]) -> + [rabbit_channel:shutdown(ChPid) + || #tracked_channel{pid = ChPid} <- TrackedChannels], + ok; +close_channels(_TrackedChannels = []) -> ok. diff --git a/src/rabbit_channel_tracking_handler.erl b/src/rabbit_channel_tracking_handler.erl new file mode 100644 index 0000000000..b2556b8742 --- /dev/null +++ b/src/rabbit_channel_tracking_handler.erl @@ -0,0 +1,80 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_channel_tracking_handler). + +%% This module keeps track of channel creation and termination events +%% on its local node. Similar to the rabbit_connection_tracking_handler, +%% the primary goal here is to decouple channel tracking from rabbit_reader +%% and isolate channel tracking to its own process to avoid blocking connection +%% creation events. Additionaly, creation events are also non-blocking in that +%% they spawn a short-live process for updating the tracking tables in realtime. +%% +%% Events from other nodes are ignored. + +-behaviour(gen_event). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). + +-include_lib("rabbit.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "channel tracking event handler"}, + {mfa, {gen_event, add_handler, + [rabbit_event, ?MODULE, []]}}, + {cleanup, {gen_event, delete_handler, + [rabbit_event, ?MODULE, []]}}, + {requires, [channel_tracking]}, + {enables, recovery}]}). + +%% +%% API +%% + +init([]) -> + {ok, []}. + +handle_event(#event{type = channel_created, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({channel_created, Details}), + {ok, State}; +handle_event(#event{type = channel_closed, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({channel_closed, Details}), + {ok, State}; +handle_event(#event{type = connection_closed, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({connection_closed, Details}), + {ok, State}; +handle_event(#event{type = user_deleted, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({user_deleted, Details}), + {ok, State}; +%% A node had been deleted from the cluster. +handle_event(#event{type = node_deleted, props = Details}, State) -> + _Pid = rabbit_channel_tracking:update_tracked({node_deleted, Details}), + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Arg, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index c9647acbe4..85ce317bba 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -37,6 +37,7 @@ tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1, tracked_connection_per_user_table_name_for/1, + get_all_tracked_connection_table_names_for_node/1, delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, @@ -128,7 +129,7 @@ handle_cast({connection_closed, Details}) -> handle_cast({vhost_deleted, Details}) -> VHost = pget(name, Details), %% Schedule vhost entry deletion, allowing time for connections to close - _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE, + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, delete_tracked_connection_vhost_entry, [VHost]), rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), shutdown_tracked_items( @@ -149,7 +150,7 @@ handle_cast({vhost_down, Details}) -> handle_cast({user_deleted, Details}) -> Username = pget(name, Details), %% Schedule user entry deletion, allowing time for connections to close - _ = timer:apply_after(?SCHEDULED_TRACKING_EXECUTION_TIMEOUT, ?MODULE, + _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, delete_tracked_connection_user_entry, [Username]), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), shutdown_tracked_items( |
