summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAyanda-D <ayanda.dube@erlang-solutions.com>2020-06-04 11:54:48 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-09-02 04:28:57 +0300
commit4cf908f0443cf630742564d7fc67ebff13c360f0 (patch)
treedfc7445b02da19f4445e245837694f6b73e5b47c
parent06becbea1e78d784dfda0a55460008c5b7b39460 (diff)
downloadrabbitmq-server-git-4cf908f0443cf630742564d7fc67ebff13c360f0.tar.gz
Add rabbit_tracking behaviour
-rw-r--r--src/rabbit_tracking.erl112
1 files changed, 112 insertions, 0 deletions
diff --git a/src/rabbit_tracking.erl b/src/rabbit_tracking.erl
new file mode 100644
index 0000000000..f4cb647af2
--- /dev/null
+++ b/src/rabbit_tracking.erl
@@ -0,0 +1,112 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_tracking).
+
+%% Common behaviour and processing functions for tracking components
+%%
+%% See in use:
+%% * rabbit_connection_tracking
+%% * rabbit_channel_tracking
+
+-callback boot() -> ok.
+-callback update_tracked(term()) -> ok.
+-callback handle_cast(term()) -> ok.
+-callback register_tracked(
+ rabbit_types:tracked_connection() |
+ rabbit_types:tracked_channel()) -> 'ok'.
+-callback unregister_tracked(
+ rabbit_types:tracked_connection_id() |
+ rabbit_types:tracked_channel_id()) -> 'ok'.
+-callback count_tracked_items_in(term()) -> non_neg_integer().
+-callback clear_tracking_tables() -> 'ok'.
+-callback shutdown_tracked_items(list(), term()) -> ok.
+
+-export([id/2, count_tracked_items/4, match_tracked_items/2,
+ clear_tracking_table/1, delete_tracking_table/3,
+ delete_tracked_entry/3]).
+
+%%----------------------------------------------------------------------------
+
+-spec id(atom(), term()) ->
+ rabbit_types:tracked_connection_id() | rabbit_types:tracked_channel_id().
+
+id(Node, Name) -> {Node, Name}.
+
+-spec count_tracked_items(function(), integer(), term(), string()) ->
+ non_neg_integer().
+
+count_tracked_items(TableNameFun, CountRecPosition, Key, ContextMsg) ->
+ lists:foldl(fun (Node, Acc) ->
+ Tab = TableNameFun(Node),
+ try
+ N = case mnesia:dirty_read(Tab, Key) of
+ [] -> 0;
+ [Val] ->
+ element(CountRecPosition, Val)
+ end,
+ Acc + N
+ catch _:Err ->
+ rabbit_log:error(
+ "Failed to fetch number of ~p ~p on node ~p:~n~p~n",
+ [ContextMsg, Key, Node, Err]),
+ Acc
+ end
+ end, 0, rabbit_mnesia:cluster_nodes(running)).
+
+-spec match_tracked_items(function(), tuple()) -> term().
+
+match_tracked_items(TableNameFun, MatchSpec) ->
+ lists:foldl(
+ fun (Node, Acc) ->
+ Tab = TableNameFun(Node),
+ Acc ++ mnesia:dirty_match_object(
+ Tab,
+ MatchSpec)
+ end, [], rabbit_mnesia:cluster_nodes(running)).
+
+-spec clear_tracking_table(atom()) -> ok.
+
+clear_tracking_table(TableName) ->
+ case mnesia:clear_table(TableName) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end.
+
+-spec delete_tracking_table(atom(), node(), string()) -> ok.
+
+delete_tracking_table(TableName, Node, ContextMsg) ->
+ case mnesia:delete_table(TableName) of
+ {atomic, ok} -> ok;
+ {aborted, {no_exists, _}} -> ok;
+ {aborted, Error} ->
+ rabbit_log:error("Failed to delete a ~p table for node ~p: ~p",
+ [ContextMsg, Node, Error]),
+ ok
+ end.
+
+-spec delete_tracked_entry({atom(), atom(), list()}, function(), term()) -> ok.
+
+delete_tracked_entry(_ExistsCheckSpec = {M, F, A}, TableNameFun, Key) ->
+ ClusterNodes = rabbit_mnesia:cluster_nodes(running),
+ ExistsInCluster =
+ lists:any(fun(Node) -> rpc:call(Node, M, F, A) end, ClusterNodes),
+ case ExistsInCluster of
+ false ->
+ [mnesia:dirty_delete(TableNameFun(Node), Key) || Node <- ClusterNodes];
+ true ->
+ ok
+ end.