diff options
-rw-r--r-- | deps/rabbit/src/rabbit_channel_tracking.erl | 145 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_connection_tracking.erl | 258 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_tracking.erl | 83 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_tracking_store.erl | 47 | ||||
-rw-r--r-- | deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl | 38 |
5 files changed, 219 insertions, 352 deletions
diff --git a/deps/rabbit/src/rabbit_channel_tracking.erl b/deps/rabbit/src/rabbit_channel_tracking.erl index bb798645b4..b608bfe01b 100644 --- a/deps/rabbit/src/rabbit_channel_tracking.erl +++ b/deps/rabbit/src/rabbit_channel_tracking.erl @@ -27,15 +27,15 @@ shutdown_tracked_items/2]). -export([list/0, list_of_user/1, list_on_node/1, - tracked_channel_table_name_for/1, - tracked_channel_per_user_table_name_for/1, - get_all_tracked_channel_table_names_for_node/1, delete_tracked_channel_user_entry/1]). -include_lib("rabbit_common/include/rabbit.hrl"). -import(rabbit_misc, [pget/2]). +-define(TRACKED_CHANNEL_TABLE, tracked_channel). +-define(TRACKED_CHANNEL_TABLE_PER_USER, tracked_channel_per_user). + %% %% API %% @@ -44,13 +44,11 @@ -spec boot() -> ok. boot() -> - ensure_tracked_channels_table_for_this_node(), + ensure_tracked_tables_for_this_node(), rabbit_log:info("Setting up a table for channel tracking on this node: ~p", - [tracked_channel_table_name_for(node())]), - ensure_per_user_tracked_channels_table_for_node(), + [?TRACKED_CHANNEL_TABLE]), rabbit_log:info("Setting up a table for channel tracking on this node: ~p", - [tracked_channel_per_user_table_name_for(node())]), - clear_tracking_tables(), + [?TRACKED_CHANNEL_TABLE_PER_USER]), ok. -spec update_tracked(term()) -> ok. @@ -115,51 +113,41 @@ handle_cast({user_deleted, Details}) -> _ = timer:apply_after(?TRACKING_EXECUTION_TIMEOUT, ?MODULE, delete_tracked_channel_user_entry, [Username]), ok; -handle_cast({node_deleted, Details}) -> - Node = pget(node, Details), - rabbit_log_channel:info( - "Node '~s' was removed from the cluster, deleting" - " its channel tracking tables...", [Node]), - delete_tracked_channels_table_for_node(Node), - delete_per_user_tracked_channels_table_for_node(Node). +handle_cast({node_deleted, _Details}) -> + ok. -spec register_tracked(rabbit_types:tracked_channel()) -> ok. -dialyzer([{nowarn_function, [register_tracked/1]}]). register_tracked(TrackedCh = - #tracked_channel{node = Node, name = Name, username = Username}) -> + #tracked_channel{node = Node, name = Name, username = Username}) when Node == node() -> ChId = rabbit_tracking:id(Node, Name), - TableName = tracked_channel_table_name_for(Node), - PerUserChTableName = tracked_channel_per_user_table_name_for(Node), %% upsert - case mnesia:dirty_read(TableName, ChId) of - [] -> - mnesia:dirty_write(TableName, TrackedCh), - mnesia:dirty_update_counter(PerUserChTableName, Username, 1); - [#tracked_channel{}] -> - ok + case ets:lookup(?TRACKED_CHANNEL_TABLE, ChId) of + [] -> + ets:insert(?TRACKED_CHANNEL_TABLE, TrackedCh), + ets:update_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username, 1, {Username, 0}); + [#tracked_channel{}] -> + ok end, ok. -spec unregister_tracked(rabbit_types:tracked_channel_id()) -> ok. unregister_tracked(ChId = {Node, _Name}) when Node =:= node() -> - TableName = tracked_channel_table_name_for(Node), - PerUserChannelTableName = tracked_channel_per_user_table_name_for(Node), - case mnesia:dirty_read(TableName, ChId) of + case ets:lookup(?TRACKED_CHANNEL_TABLE, ChId) of [] -> ok; [#tracked_channel{username = Username}] -> - mnesia:dirty_update_counter(PerUserChannelTableName, Username, -1), - mnesia:dirty_delete(TableName, ChId) + ets:update_counter(?TRACKED_CHANNEL_TABLE_PER_USER, Username, -1), + ets:delete(?TRACKED_CHANNEL_TABLE, ChId) end. -spec count_tracked_items_in({atom(), rabbit_types:username()}) -> non_neg_integer(). count_tracked_items_in({user, Username}) -> rabbit_tracking:count_tracked_items( - fun tracked_channel_per_user_table_name_for/1, - #tracked_channel_per_user.channel_count, Username, - "channels in vhost"). + ?TRACKED_CHANNEL_TABLE_PER_USER, Username, + "channels in vhost"). -spec clear_tracking_tables() -> ok. @@ -178,109 +166,52 @@ shutdown_tracked_items(TrackedItems, _Args) -> list() -> lists:foldl( fun (Node, Acc) -> - Tab = tracked_channel_table_name_for(Node), - try - Acc ++ - mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'}) - catch - exit:{aborted, {no_exists, [Tab, _]}} -> - %% The table might not exist yet (or is already gone) - %% between the time rabbit_nodes:all_running() runs and - %% returns a specific node, and - %% mnesia:dirty_match_object() is called for that node's - %% table. - Acc - end + list_on_node(Node) ++ Acc end, [], rabbit_nodes:all_running()). -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()]. list_of_user(Username) -> rabbit_tracking:match_tracked_items( - fun tracked_channel_table_name_for/1, - #tracked_channel{username = Username, _ = '_'}). + ?TRACKED_CHANNEL_TABLE, + #tracked_channel{username = Username, _ = '_'}). -spec list_on_node(node()) -> [rabbit_types:tracked_channel()]. +list_on_node(Node) when Node == node() -> + ets:tab2list(?TRACKED_CHANNEL_TABLE); list_on_node(Node) -> - try mnesia:dirty_match_object( - tracked_channel_table_name_for(Node), - #tracked_channel{_ = '_'}) - catch exit:{aborted, {no_exists, _}} -> [] + case rabbit_misc:rpc_call(Node, ets, tab2list, [?TRACKED_CHANNEL_TABLE]) of + List when is_list(List) -> + List; + _ -> + [] end. --spec tracked_channel_table_name_for(node()) -> atom(). - -tracked_channel_table_name_for(Node) -> - list_to_atom(rabbit_misc:format("tracked_channel_on_node_~s", [Node])). - --spec tracked_channel_per_user_table_name_for(node()) -> atom(). - -tracked_channel_per_user_table_name_for(Node) -> - list_to_atom(rabbit_misc:format( - "tracked_channel_table_per_user_on_node_~s", [Node])). - %% internal -ensure_tracked_channels_table_for_this_node() -> - ensure_tracked_channels_table_for_node(node()). - -ensure_per_user_tracked_channels_table_for_node() -> - ensure_per_user_tracked_channels_table_for_node(node()). - -%% Create tables -ensure_tracked_channels_table_for_node(Node) -> - TableName = tracked_channel_table_name_for(Node), - case mnesia:create_table(TableName, [{record_name, tracked_channel}, - {attributes, record_info(fields, tracked_channel)}]) of - {atomic, ok} -> ok; - {aborted, {already_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to create a tracked channel table for node ~p: ~p", [Node, Error]), - ok - end. - -ensure_per_user_tracked_channels_table_for_node(Node) -> - TableName = tracked_channel_per_user_table_name_for(Node), - case mnesia:create_table(TableName, [{record_name, tracked_channel_per_user}, - {attributes, record_info(fields, tracked_channel_per_user)}]) of - {atomic, ok} -> ok; - {aborted, {already_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to create a per-user tracked channel table for node ~p: ~p", [Node, Error]), - ok - end. +ensure_tracked_tables_for_this_node() -> + ets:new(?TRACKED_CHANNEL_TABLE, [named_table, public, {write_concurrency, true}, + {keypos, #tracked_connection.id}]), + ets:new(?TRACKED_CHANNEL_TABLE_PER_USER, [named_table, public, {write_concurrency, true}]). clear_tracked_channel_tables_for_this_node() -> [rabbit_tracking:clear_tracking_table(T) - || T <- get_all_tracked_channel_table_names_for_node(node())]. - -delete_tracked_channels_table_for_node(Node) -> - TableName = tracked_channel_table_name_for(Node), - rabbit_tracking:delete_tracking_table(TableName, Node, "tracked channel"). - -delete_per_user_tracked_channels_table_for_node(Node) -> - TableName = tracked_channel_per_user_table_name_for(Node), - rabbit_tracking:delete_tracking_table(TableName, Node, - "per-user tracked channels"). - -get_all_tracked_channel_table_names_for_node(Node) -> - [tracked_channel_table_name_for(Node), - tracked_channel_per_user_table_name_for(Node)]. + || T <- [?TRACKED_CHANNEL_TABLE, ?TRACKED_CHANNEL_TABLE_PER_USER]]. get_tracked_channels_by_connection_pid(ConnPid) -> rabbit_tracking:match_tracked_items( - fun tracked_channel_table_name_for/1, + ?TRACKED_CHANNEL_TABLE, #tracked_channel{connection = ConnPid, _ = '_'}). get_tracked_channel_by_pid(ChPid) -> rabbit_tracking:match_tracked_items( - fun tracked_channel_table_name_for/1, + ?TRACKED_CHANNEL_TABLE, #tracked_channel{pid = ChPid, _ = '_'}). delete_tracked_channel_user_entry(Username) -> rabbit_tracking:delete_tracked_entry( {rabbit_auth_backend_internal, exists, [Username]}, - fun tracked_channel_per_user_table_name_for/1, + ?TRACKED_CHANNEL_TABLE_PER_USER, Username). tracked_channel_from_channel_created_event(ChannelDetails) -> diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl index b33d5da18c..098fa9cb87 100644 --- a/deps/rabbit/src/rabbit_connection_tracking.erl +++ b/deps/rabbit/src/rabbit_connection_tracking.erl @@ -26,23 +26,7 @@ clear_tracking_tables/0, shutdown_tracked_items/2]). --export([ensure_tracked_connections_table_for_node/1, - ensure_per_vhost_tracked_connections_table_for_node/1, - ensure_per_user_tracked_connections_table_for_node/1, - - ensure_tracked_connections_table_for_this_node/0, - ensure_per_vhost_tracked_connections_table_for_this_node/0, - ensure_per_user_tracked_connections_table_for_this_node/0, - - tracked_connection_table_name_for/1, - tracked_connection_per_vhost_table_name_for/1, - tracked_connection_per_user_table_name_for/1, - get_all_tracked_connection_table_names_for_node/1, - - delete_tracked_connections_table_for_node/1, - delete_per_vhost_tracked_connections_table_for_node/1, - delete_per_user_tracked_connections_table_for_node/1, - delete_tracked_connection_user_entry/1, +-export([delete_tracked_connection_user_entry/1, delete_tracked_connection_vhost_entry/1, clear_tracked_connection_tables_for_this_node/0, @@ -53,12 +37,18 @@ lookup/1, count/0]). +-export([ensure_tracked_tables_for_this_node/0]). + -include_lib("rabbit_common/include/rabbit.hrl"). -import(rabbit_misc, [pget/2]). -export([close_connections/3]). +-define(TRACKED_CONNECTION_TABLE, tracked_connection). +-define(TRACKED_CONNECTION_TABLE_PER_USER, tracked_connection_per_user). +-define(TRACKED_CONNECTION_TABLE_PER_VHOST, tracked_connection_per_vhost). + %% %% API %% @@ -70,16 +60,13 @@ %% Sets up and resets connection tracking tables for this %% node. boot() -> - ensure_tracked_connections_table_for_this_node(), + ensure_tracked_tables_for_this_node(), rabbit_log:info("Setting up a table for connection tracking on this node: ~p", - [tracked_connection_table_name_for(node())]), - ensure_per_vhost_tracked_connections_table_for_this_node(), + [?TRACKED_CONNECTION_TABLE]), rabbit_log:info("Setting up a table for per-vhost connection counting on this node: ~p", - [tracked_connection_per_vhost_table_name_for(node())]), - ensure_per_user_tracked_connections_table_for_this_node(), + [?TRACKED_CONNECTION_TABLE_PER_VHOST]), rabbit_log:info("Setting up a table for per-user connection counting on this node: ~p", - [tracked_connection_per_user_table_name_for(node())]), - clear_tracking_tables(), + [?TRACKED_CONNECTION_TABLE_PER_USER]), ok. -spec update_tracked(term()) -> ok. @@ -158,26 +145,19 @@ handle_cast({user_deleted, Details}) -> rabbit_connection_tracking:list_of_user(Username), rabbit_misc:format("user '~s' is deleted", [Username])); %% A node had been deleted from the cluster. -handle_cast({node_deleted, Details}) -> - Node = pget(node, Details), - rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]), - delete_tracked_connections_table_for_node(Node), - delete_per_vhost_tracked_connections_table_for_node(Node), - delete_per_user_tracked_connections_table_for_node(Node). +handle_cast({node_deleted, _}) -> + ok. -spec register_tracked(rabbit_types:tracked_connection()) -> ok. -dialyzer([{nowarn_function, [register_tracked/1]}]). register_tracked(#tracked_connection{username = Username, vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() -> - TableName = tracked_connection_table_name_for(Node), - PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), - PerUserConnTableName = tracked_connection_per_user_table_name_for(Node), %% upsert - case mnesia:dirty_read(TableName, ConnId) of + case ets:lookup(?TRACKED_CONNECTION_TABLE, ConnId) of [] -> - mnesia:dirty_write(TableName, Conn), - mnesia:dirty_update_counter(PerVhostTableName, VHost, 1), - mnesia:dirty_update_counter(PerUserConnTableName, Username, 1); + ets:insert(?TRACKED_CONNECTION_TABLE, Conn), + ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_VHOST, VHost, 1, {VHost, 0}), + ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_USER, Username, 1, {Username, 0}); [#tracked_connection{}] -> ok end, @@ -186,28 +166,25 @@ register_tracked(#tracked_connection{username = Username, vhost = VHost, id = Co -spec unregister_tracked(rabbit_types:tracked_connection_id()) -> ok. unregister_tracked(ConnId = {Node, _Name}) when Node =:= node() -> - TableName = tracked_connection_table_name_for(Node), - PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node), - PerUserConnTableName = tracked_connection_per_user_table_name_for(Node), - case mnesia:dirty_read(TableName, ConnId) of + case ets:lookup(?TRACKED_CONNECTION_TABLE, ConnId) of [] -> ok; [#tracked_connection{vhost = VHost, username = Username}] -> - mnesia:dirty_update_counter(PerUserConnTableName, Username, -1), - mnesia:dirty_update_counter(PerVhostTableName, VHost, -1), - mnesia:dirty_delete(TableName, ConnId) + ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_USER, Username, -1), + ets:update_counter(?TRACKED_CONNECTION_TABLE_PER_VHOST, VHost, -1), + ets:delete(?TRACKED_CONNECTION_TABLE, ConnId) end. -spec count_tracked_items_in({atom(), rabbit_types:vhost()}) -> non_neg_integer(). count_tracked_items_in({vhost, VirtualHost}) -> rabbit_tracking:count_tracked_items( - fun tracked_connection_per_vhost_table_name_for/1, - #tracked_connection_per_vhost.connection_count, VirtualHost, + ?TRACKED_CONNECTION_TABLE_PER_VHOST, + VirtualHost, "connections in vhost"); count_tracked_items_in({user, Username}) -> rabbit_tracking:count_tracked_items( - fun tracked_connection_per_user_table_name_for/1, - #tracked_connection_per_user.connection_count, Username, + ?TRACKED_CONNECTION_TABLE_PER_USER, + Username, "connections for user"). -spec clear_tracking_tables() -> ok. @@ -222,114 +199,22 @@ shutdown_tracked_items(TrackedItems, Message) -> %% Extended API --spec ensure_tracked_connections_table_for_this_node() -> ok. - -ensure_tracked_connections_table_for_this_node() -> - ensure_tracked_connections_table_for_node(node()). - - --spec ensure_per_vhost_tracked_connections_table_for_this_node() -> ok. - -ensure_per_vhost_tracked_connections_table_for_this_node() -> - ensure_per_vhost_tracked_connections_table_for_node(node()). - - --spec ensure_per_user_tracked_connections_table_for_this_node() -> ok. - -ensure_per_user_tracked_connections_table_for_this_node() -> - ensure_per_user_tracked_connections_table_for_node(node()). - - %% Create tables --spec ensure_tracked_connections_table_for_node(node()) -> ok. - -ensure_tracked_connections_table_for_node(Node) -> - TableName = tracked_connection_table_name_for(Node), - case mnesia:create_table(TableName, [{record_name, tracked_connection}, - {attributes, record_info(fields, tracked_connection)}]) of - {atomic, ok} -> ok; - {aborted, {already_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to create a tracked connection table for node ~p: ~p", [Node, Error]), - ok - end. - --spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok. - -ensure_per_vhost_tracked_connections_table_for_node(Node) -> - TableName = tracked_connection_per_vhost_table_name_for(Node), - case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost}, - {attributes, record_info(fields, tracked_connection_per_vhost)}]) of - {atomic, ok} -> ok; - {aborted, {already_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to create a per-vhost tracked connection table for node ~p: ~p", [Node, Error]), - ok - end. - --spec ensure_per_user_tracked_connections_table_for_node(node()) -> ok. - -ensure_per_user_tracked_connections_table_for_node(Node) -> - TableName = tracked_connection_per_user_table_name_for(Node), - case mnesia:create_table(TableName, [{record_name, tracked_connection_per_user}, - {attributes, record_info(fields, tracked_connection_per_user)}]) of - {atomic, ok} -> ok; - {aborted, {already_exists, _}} -> ok; - {aborted, Error} -> - rabbit_log:error("Failed to create a per-user tracked connection table for node ~p: ~p", [Node, Error]), - ok - end. +ensure_tracked_tables_for_this_node() -> + ets:new(?TRACKED_CONNECTION_TABLE, [named_table, public, {write_concurrency, true}, + {keypos, #tracked_connection.id}]), + ets:new(?TRACKED_CONNECTION_TABLE_PER_USER, [named_table, public, {write_concurrency, true}]), + ets:new(?TRACKED_CONNECTION_TABLE_PER_VHOST, [named_table, public, {write_concurrency, true}]). -spec clear_tracked_connection_tables_for_this_node() -> ok. clear_tracked_connection_tables_for_this_node() -> [rabbit_tracking:clear_tracking_table(T) - || T <- get_all_tracked_connection_table_names_for_node(node())], + || T <- [?TRACKED_CONNECTION_TABLE, + ?TRACKED_CONNECTION_TABLE_PER_USER, + ?TRACKED_CONNECTION_TABLE_PER_VHOST]], ok. --spec delete_tracked_connections_table_for_node(node()) -> ok. - -delete_tracked_connections_table_for_node(Node) -> - TableName = tracked_connection_table_name_for(Node), - rabbit_tracking:delete_tracking_table(TableName, Node, "tracked connection"). - --spec delete_per_vhost_tracked_connections_table_for_node(node()) -> ok. - -delete_per_vhost_tracked_connections_table_for_node(Node) -> - TableName = tracked_connection_per_vhost_table_name_for(Node), - rabbit_tracking:delete_tracking_table(TableName, Node, - "per-vhost tracked connection"). - --spec delete_per_user_tracked_connections_table_for_node(node()) -> ok. - -delete_per_user_tracked_connections_table_for_node(Node) -> - TableName = tracked_connection_per_user_table_name_for(Node), - rabbit_tracking:delete_tracking_table(TableName, Node, - "per-user tracked connection"). - --spec tracked_connection_table_name_for(node()) -> atom(). - -tracked_connection_table_name_for(Node) -> - list_to_atom(rabbit_misc:format("tracked_connection_on_node_~s", [Node])). - --spec tracked_connection_per_vhost_table_name_for(node()) -> atom(). - -tracked_connection_per_vhost_table_name_for(Node) -> - list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])). - --spec tracked_connection_per_user_table_name_for(node()) -> atom(). - -tracked_connection_per_user_table_name_for(Node) -> - list_to_atom(rabbit_misc:format( - "tracked_connection_table_per_user_on_node_~s", [Node])). - --spec get_all_tracked_connection_table_names_for_node(node()) -> [atom()]. - -get_all_tracked_connection_table_names_for_node(Node) -> - [tracked_connection_table_name_for(Node), - tracked_connection_per_vhost_table_name_for(Node), - tracked_connection_per_user_table_name_for(Node)]. - -spec lookup(rabbit_types:connection_name()) -> rabbit_types:tracked_connection() | 'not_found'. lookup(Name) -> @@ -338,11 +223,15 @@ lookup(Name) -> lookup(_, []) -> not_found; -lookup(Name, [Node | Nodes]) -> - TableName = tracked_connection_table_name_for(Node), - case mnesia:dirty_read(TableName, {Node, Name}) of +lookup(Name, [Node | Nodes]) when Node == node() -> + case ets:lookup(?TRACKED_CONNECTION_TABLE, {Node, Name}) of [] -> lookup(Name, Nodes); [Row] -> Row + end; +lookup(Name, [Node | Nodes]) -> + case rabbit_misc:rpc_call(Node, ets, lookup, [?TRACKED_CONNECTION_TABLE, {Node, Name}]) of + [Row] -> Row; + _ -> lookup(Name, Nodes) end. -spec list() -> [rabbit_types:tracked_connection()]. @@ -350,78 +239,77 @@ lookup(Name, [Node | Nodes]) -> list() -> lists:foldl( fun (Node, Acc) -> - Tab = tracked_connection_table_name_for(Node), - try - Acc ++ - mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'}) - catch - exit:{aborted, {no_exists, [Tab, _]}} -> - %% The table might not exist yet (or is already gone) - %% between the time rabbit_nodes:all_running() runs and - %% returns a specific node, and - %% mnesia:dirty_match_object() is called for that node's - %% table. - Acc - end + list_on_node(Node) ++ Acc end, [], rabbit_nodes:all_running()). -spec count() -> non_neg_integer(). count() -> lists:foldl( - fun (Node, Acc) -> - Tab = tracked_connection_table_name_for(Node), - %% mnesia:table_info() returns 0 if the table doesn't exist. We - %% don't need the same kind of protection as the list() function - %% above. - Acc + mnesia:table_info(Tab, size) + fun (Node, Acc) when Node == node() -> + Acc + ets:info(?TRACKED_CONNECTION_TABLE, size); + (Node, Acc) -> + case rabbit_misc:rpc_call(Node, ets, info, [?TRACKED_CONNECTION_TABLE, size]) of + Int when is_integer(Int) -> + Acc + Int; + _ -> + Acc + end end, 0, rabbit_nodes:all_running()). -spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. list(VHost) -> rabbit_tracking:match_tracked_items( - fun tracked_connection_table_name_for/1, + ?TRACKED_CONNECTION_TABLE, #tracked_connection{vhost = VHost, _ = '_'}). -spec list_on_node(node()) -> [rabbit_types:tracked_connection()]. +list_on_node(Node) when Node == node() -> + ets:tab2list(?TRACKED_CONNECTION_TABLE); list_on_node(Node) -> - try mnesia:dirty_match_object( - tracked_connection_table_name_for(Node), - #tracked_connection{_ = '_'}) - catch exit:{aborted, {no_exists, _}} -> [] + case rabbit_misc:rpc_call(Node, ets, tab2list, [?TRACKED_CONNECTION_TABLE]) of + List when is_list(List) -> + List; + _ -> + [] end. -spec list_on_node(node(), rabbit_types:vhost()) -> [rabbit_types:tracked_connection()]. +list_on_node(Node, VHost) when Node == node() -> + ets:match_object(?TRACKED_CONNECTION_TABLE, + #tracked_connection{vhost = VHost, _ = '_'}); list_on_node(Node, VHost) -> - try mnesia:dirty_match_object( - tracked_connection_table_name_for(Node), - #tracked_connection{vhost = VHost, _ = '_'}) - catch exit:{aborted, {no_exists, _}} -> [] + case rabbit_misc:rpc_call(Node, ets, match_object, + [?TRACKED_CONNECTION_TABLE, + #tracked_connection{vhost = VHost, _ = '_'}]) of + List when is_list(List) -> + List; + _ -> + [] end. - - + -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()]. list_of_user(Username) -> rabbit_tracking:match_tracked_items( - fun tracked_connection_table_name_for/1, - #tracked_connection{username = Username, _ = '_'}). + ?TRACKED_CONNECTION_TABLE, + #tracked_connection{username = Username, _ = '_'}). %% Internal, delete tracked entries delete_tracked_connection_vhost_entry(Vhost) -> rabbit_tracking:delete_tracked_entry( {rabbit_vhost, exists, [Vhost]}, - fun tracked_connection_per_vhost_table_name_for/1, + ?TRACKED_CONNECTION_TABLE_PER_VHOST, Vhost). delete_tracked_connection_user_entry(Username) -> rabbit_tracking:delete_tracked_entry( {rabbit_auth_backend_internal, exists, [Username]}, - fun tracked_connection_per_user_table_name_for/1, + ?TRACKED_CONNECTION_TABLE_PER_USER, Username). %% Returns a #tracked_connection from connection_created diff --git a/deps/rabbit/src/rabbit_tracking.erl b/deps/rabbit/src/rabbit_tracking.erl index 87b0e7bb4d..cea1e89a70 100644 --- a/deps/rabbit/src/rabbit_tracking.erl +++ b/deps/rabbit/src/rabbit_tracking.erl @@ -26,7 +26,7 @@ -callback clear_tracking_tables() -> 'ok'. -callback shutdown_tracked_items(list(), term()) -> ok. --export([id/2, count_tracked_items/4, match_tracked_items/2, +-export([id/2, count_tracked_items/3, match_tracked_items/2, clear_tracking_table/1, delete_tracking_table/3, delete_tracked_entry/3]). @@ -37,53 +37,66 @@ id(Node, Name) -> {Node, Name}. --spec count_tracked_items(function(), integer(), term(), string()) -> +-spec count_tracked_items(function(), term(), string()) -> non_neg_integer(). -count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) -> - lists:foldl(fun (Node, Acc) -> - Tab = TableNameFun(Node), - try - N = case mnesia:dirty_read(Tab, Key) of - [] -> 0; - [Val] -> - element(CountRecPosition, Val) - end, - Acc + N - catch _:Err -> - rabbit_log:error( - "Failed to fetch number of ~p ~p on node ~p:~n~p", - [ContextMsg, Key, Node, Err]), - Acc - end +count_tracked_items(Tab, Key, ContextMsg) -> + lists:foldl(fun (Node, Acc) when Node == node() -> + N = case ets:lookup(Tab, Key) of + [] -> 0; + [{_, Val}] -> Val + end, + Acc + N; + (Node, Acc) -> + N = case rabbit_misc:rpc_call(Node, ets, lookup, [Tab, Key]) of + [] -> 0; + [{_, Val}] -> Val; + {badrpc, Err} -> + rabbit_log:error( + "Failed to fetch number of ~p ~p on node ~p:~n~p", + [ContextMsg, Key, Node, Err]), + 0 + end, + Acc + N end, 0, rabbit_nodes:all_running()). -spec match_tracked_items(function(), tuple()) -> term(). -match_tracked_items(TableNameFun, MatchSpec) -> +match_tracked_items(Tab, MatchSpec) -> lists:foldl( - fun (Node, Acc) -> - Tab = TableNameFun(Node), - Acc ++ mnesia:dirty_match_object( + fun (Node, Acc) when Node == node() -> + Acc ++ ets:match_object( Tab, - MatchSpec) + MatchSpec); + (Node, Acc) -> + case rabbit_misc:rpc_call(Node, ets, match_object, [Tab, MatchSpec]) of + List when is_list(List) -> + Acc ++ List; + _ -> + Acc + end end, [], rabbit_nodes:all_running()). -spec clear_tracking_table(atom()) -> ok. clear_tracking_table(TableName) -> - case mnesia:clear_table(TableName) of - {atomic, ok} -> ok; - {aborted, _} -> ok + try + true = ets:delete_all_objects(TableName), + ok + catch + error:badarg -> + ok end. -spec delete_tracking_table(atom(), node(), string()) -> ok. +delete_tracking_table(TableName, Node, _ContextMsg) when Node == node() -> + true = ets:delete(TableName), + ok; delete_tracking_table(TableName, Node, ContextMsg) -> - case mnesia:delete_table(TableName) of - {atomic, ok} -> ok; - {aborted, {no_exists, _}} -> ok; - {aborted, Error} -> + case rabbit_misc:rpc_call(Node, ets, delete, [TableName]) of + true -> ok; + {badrpc, Error} -> rabbit_log:error("Failed to delete a ~p table for node ~p: ~p", [ContextMsg, Node, Error]), ok @@ -91,13 +104,19 @@ delete_tracking_table(TableName, Node, ContextMsg) -> -spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok. -delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) -> +delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableName, Key) -> ClusterNodes = rabbit_nodes:all_running(), ExistsInCluster = lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes), case ExistsInCluster of false -> - [mnesia:dirty_delete(TableNameFun(Node), Key) || Node <- ClusterNodes]; + [delete_tracked_entry0(Node, TableName, Key) || Node <- ClusterNodes]; true -> ok end. + +delete_tracked_entry0(Node, Tab, Key) when Node == node() -> + true = ets:delete(Tab, Key); +delete_tracked_entry0(Node, Tab, Key) -> + _ = rabbit_misc:rpc_call(Node, ets, delete, [Tab, Key]), + ok. diff --git a/deps/rabbit/src/rabbit_tracking_store.erl b/deps/rabbit/src/rabbit_tracking_store.erl new file mode 100644 index 0000000000..49da0bfb0a --- /dev/null +++ b/deps/rabbit/src/rabbit_tracking_store.erl @@ -0,0 +1,47 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_tracking_store). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3, format_status/2]). + +-define(SERVER, ?MODULE). + +-record(state, {}). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + rabbit_connection_tracking:ensure_tracked_tables_for_this_node(), + rabbit_channel_tracking:ensure_tracked_tables_for_this_node(), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +format_status(_Opt, Status) -> + Status. diff --git a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl index 79190dc2ff..9fde0465e1 100644 --- a/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_tracking_SUITE.erl @@ -652,19 +652,6 @@ cluster_node_removed(Config) -> rabbit_ct_broker_helpers:forget_cluster_node(Config, 0, 1), timer:sleep(200), - NodeName = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), - - DroppedConnTrackingTables = - rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(NodeName), - [?assertEqual( - {'EXIT', {aborted, {no_exists, Tab, all}}}, - catch mnesia:table_info(Tab, all)) || Tab <- DroppedConnTrackingTables], - - DroppedChTrackingTables = - rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(NodeName), - [?assertEqual( - {'EXIT', {aborted, {no_exists, Tab, all}}}, - catch mnesia:table_info(Tab, all)) || Tab <- DroppedChTrackingTables], ?assertEqual(false, is_process_alive(Conn2)), [?assertEqual(false, is_process_alive(Ch)) || Ch <- Chans2], @@ -769,31 +756,26 @@ exists_in_tracked_connection_per_vhost_table(Config, VHost) -> exists_in_tracked_connection_per_vhost_table(Config, 0, VHost). exists_in_tracked_connection_per_vhost_table(Config, NodeIndex, VHost) -> exists_in_tracking_table(Config, NodeIndex, - fun rabbit_connection_tracking:tracked_connection_per_vhost_table_name_for/1, - VHost). + tracked_connection_per_vhost, + VHost). exists_in_tracked_connection_per_user_table(Config, Username) -> exists_in_tracked_connection_per_user_table(Config, 0, Username). exists_in_tracked_connection_per_user_table(Config, NodeIndex, Username) -> exists_in_tracking_table(Config, NodeIndex, - fun rabbit_connection_tracking:tracked_connection_per_user_table_name_for/1, - Username). + tracked_connection_per_user, + Username). exists_in_tracked_channel_per_user_table(Config, Username) -> exists_in_tracked_channel_per_user_table(Config, 0, Username). exists_in_tracked_channel_per_user_table(Config, NodeIndex, Username) -> exists_in_tracking_table(Config, NodeIndex, - fun rabbit_channel_tracking:tracked_channel_per_user_table_name_for/1, - Username). - -exists_in_tracking_table(Config, NodeIndex, TableNameFun, Key) -> - Node = rabbit_ct_broker_helpers:get_node_config( - Config, NodeIndex, nodename), - Tab = TableNameFun(Node), - AllKeys = rabbit_ct_broker_helpers:rpc(Config, NodeIndex, - mnesia, - dirty_all_keys, [Tab]), - lists:member(Key, AllKeys). + tracked_connection_per_user, + Username). + +exists_in_tracking_table(Config, NodeIndex, Tab, Key) -> + All = rabbit_ct_broker_helpers:rpc(Config, NodeIndex, ets, tab2list, [Tab]), + lists:keymember(Key, 1, All). mimic_vhost_down(Config, NodeIndex, VHost) -> rabbit_ct_broker_helpers:rpc(Config, NodeIndex, |