summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_connection_tracker.erl99
-rw-r--r--src/rabbit_connection_tracking.erl186
-rw-r--r--src/rabbit_connection_tracking_handler.erl38
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_node_monitor.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl15
7 files changed, 163 insertions, 187 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 59ede3c802..5db2c40b66 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -178,9 +178,9 @@
{mfa, {rabbit_direct, boot, []}},
{requires, log_relay}]}).
--rabbit_boot_step({connection_tracker,
- [{description, "helps track node-local connections"},
- {mfa, {rabbit_connection_tracker, boot, []}},
+-rabbit_boot_step({connection_tracking,
+ [{description, "sets up internal storage for node-local connections"},
+ {mfa, {rabbit_connection_tracking, boot, []}},
{requires, log_relay}]}).
-rabbit_boot_step({networking,
diff --git a/src/rabbit_connection_tracker.erl b/src/rabbit_connection_tracker.erl
deleted file mode 100644
index 0177627d83..0000000000
--- a/src/rabbit_connection_tracker.erl
+++ /dev/null
@@ -1,99 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(rabbit_connection_tracker).
-
-%% Abstracts away how tracked connection records are stored
-%% and queried.
-%%
-%% See also:
-%%
-%% * rabbit_connection_tracking_handler
-%% * rabbit_reader
-%% * rabbit_event
-
--behaviour(gen_server2).
-
-%% API
--export([boot/0, start_link/0, reregister/1]).
-
-%% gen_fsm callbacks
--export([init/1,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3]).
-
--define(SERVER, ?MODULE).
-
-
-%%%===================================================================
-%%% API
-%%%===================================================================
-
-boot() ->
- {ok, _} = start_link(),
- ok.
-
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-reregister(Node) ->
- rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]),
- gen_server2:cast({?SERVER, Node}, reregister).
-
-%%%===================================================================
-%%% gen_server callbacks
-%%%===================================================================
-
-init([]) ->
- {ok, {}}.
-
-handle_call(_Req, _From, State) ->
- {noreply, State}.
-
-handle_cast(reregister, State) ->
- Cs = rabbit_networking:connections_local(),
- rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]),
- case Cs of
- [] -> ok;
- Cs ->
- [reregister_connection(C) || C <- Cs],
- ok
- end,
- rabbit_log:info("Done re-registering client connections"),
- {noreply, State}.
-
-handle_info(_Req, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%%===================================================================
-%%% Internal functions
-%%%===================================================================
-
-reregister_connection(Conn) ->
- try
- Conn ! reregister
- catch _:Error ->
- rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error])
- end.
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index f059c2698b..17e2d79034 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -25,34 +25,117 @@
%% * rabbit_reader
%% * rabbit_event
--export([register_connection/1, unregister_connection/1,
+-export([boot/0,
+ ensure_tracked_connections_table_for_node/1,
+ ensure_per_vhost_tracked_connections_table_for_node/1,
+ ensure_tracked_connections_table_for_this_node/0,
+ ensure_per_vhost_tracked_connections_table_for_this_node/0,
+ tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1,
+ clear_tracked_connections_table_for_this_node/0,
+ register_connection/1, unregister_connection/1,
list/0, list/1, list_on_node/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
- is_over_connection_limit/1, count_connections_in/1,
- on_node_down/1, on_node_up/1]).
+ is_over_connection_limit/1, count_connections_in/1]).
-include_lib("rabbit.hrl").
--define(TABLE, rabbit_tracked_connection).
--define(PER_VHOST_COUNTER_TABLE, rabbit_tracked_connection_per_vhost).
--define(SERVER, ?MODULE).
-
%%
%% API
%%
+-spec boot() -> ok.
+
+%% Sets up and resets connection tracking tables for this
+%% node.
+boot() ->
+ ensure_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Created a table for connection tracking on this node: ~p",
+ [tracked_connection_table_name_for(node())]),
+ ensure_per_vhost_tracked_connections_table_for_this_node(),
+ rabbit_log:info("Created a table for per-vhost connection counting on this node: ~p",
+ [tracked_connection_per_vhost_table_name_for(node())]),
+ clear_tracked_connections_table_for_this_node(),
+ ok.
+
+
+-spec ensure_tracked_connections_table_for_this_node() -> ok.
+
+ensure_tracked_connections_table_for_this_node() ->
+ ensure_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_per_vhost_tracked_connections_table_for_this_node() -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_this_node() ->
+ ensure_per_vhost_tracked_connections_table_for_node(node()).
+
+
+-spec ensure_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection},
+ {attributes, [id, node, vhost, name,
+ pid, protocol,
+ peer_host, peer_port,
+ username, connected_at]}]) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ %% TODO: propagate errors
+ end.
+
+
+-spec ensure_per_vhost_tracked_connections_table_for_node(node()) -> ok.
+
+ensure_per_vhost_tracked_connections_table_for_node(Node) ->
+ TableName = tracked_connection_per_vhost_table_name_for(Node),
+ case mnesia:create_table(TableName, [{record_name, tracked_connection_per_vhost},
+ {attributes, [vhost, connection_count]}]) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ %% TODO: propagate errors
+ end.
+
+
+-spec clear_tracked_connections_table_for_this_node() -> ok.
+
+clear_tracked_connections_table_for_this_node() ->
+ case mnesia:clear_table(tracked_connection_table_name_for(node())) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end,
+ case mnesia:clear_table(tracked_connection_per_vhost_table_name_for(node())) of
+ {atomic, ok} -> ok;
+ {aborted, _} -> ok
+ end.
+
+
+-spec tracked_connection_table_name_for(node()) -> atom().
+
+tracked_connection_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_on_node_~s", [Node])).
+
+-spec tracked_connection_per_vhost_table_name_for(node()) -> atom().
+
+tracked_connection_per_vhost_table_name_for(Node) ->
+ list_to_atom(rabbit_misc:format("tracked_connection_per_vhost_on_node_~s", [Node])).
+
+
-spec register_connection(rabbit_types:tracked_connection()) -> ok.
-register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) ->
+register_connection(#tracked_connection{vhost = VHost, id = ConnId, node = Node} = Conn) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
rabbit_misc:execute_mnesia_transaction(
fun() ->
%% upsert
- case mnesia:dirty_read(?TABLE, ConnId) of
+ case mnesia:dirty_read(TableName, ConnId) of
[] ->
- mnesia:write(?TABLE, Conn, write),
+ %% TODO: counter table
+ mnesia:write(TableName, Conn, write),
mnesia:dirty_update_counter(
- ?PER_VHOST_COUNTER_TABLE, VHost, 1);
+ PerVhostTableName, VHost, 1);
[_Row] ->
ok
end,
@@ -61,16 +144,17 @@ register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) ->
-spec unregister_connection(rabbit_types:connection_name()) -> ok.
-unregister_connection(ConnId = {_Node, _Name}) ->
+unregister_connection(ConnId = {Node, _Name}) when Node =:= node() ->
+ TableName = tracked_connection_table_name_for(Node),
+ PerVhostTableName = tracked_connection_per_vhost_table_name_for(Node),
rabbit_misc:execute_mnesia_transaction(
fun() ->
- case mnesia:dirty_read(?TABLE, ConnId) of
- [] -> ok;
+ case mnesia:dirty_read(TableName, ConnId) of
+ [] -> ok;
[Row] ->
mnesia:dirty_update_counter(
- ?PER_VHOST_COUNTER_TABLE,
- Row#tracked_connection.vhost, -1),
- mnesia:delete({?TABLE, ConnId})
+ PerVhostTableName, Row#tracked_connection.vhost, -1),
+ mnesia:delete({TableName, ConnId})
end
end).
@@ -78,40 +162,33 @@ unregister_connection(ConnId = {_Node, _Name}) ->
-spec list() -> [rabbit_types:tracked_connection()].
list() ->
- mnesia:dirty_match_object(?TABLE, #tracked_connection{_ = '_'}).
+ Chunks = lists:map(
+ fun (Node) ->
+ Tab = tracked_connection_table_name_for(Node),
+ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
+ end, rabbit_mnesia:cluster_nodes(running)),
+ lists:foldl(fun(Chunk, Acc) -> Acc ++ Chunk end, [], Chunks).
-spec list(rabbit_types:vhost()) -> [rabbit_types:tracked_connection()].
list(VHost) ->
- mnesia:dirty_match_object(?TABLE, #tracked_connection{vhost = VHost, _ = '_'}).
+ Chunks = lists:map(
+ fun (Node) ->
+ Tab = tracked_connection_table_name_for(Node),
+ mnesia:dirty_match_object(Tab, #tracked_connection{vhost = VHost, _ = '_'})
+ end, rabbit_mnesia:cluster_nodes(running)),
+ lists:foldl(fun(Chunk, Acc) -> Acc ++ Chunk end, [], Chunks).
-spec list_on_node(node()) -> [rabbit_types:tracked_connection()].
list_on_node(Node) ->
- mnesia:dirty_match_object(?TABLE, #tracked_connection{node = Node, _ = '_'}).
-
-
--spec on_node_down(node()) -> ok.
-
-on_node_down(Node) ->
- case lists:member(Node, nodes()) of
- false ->
- Cs = list_on_node(Node),
- rabbit_log:info(
- "Node ~p is down, unregistering ~p client connections~n",
- [Node, length(Cs)]),
- [unregister_connection(Id) || #tracked_connection{id = Id} <- Cs],
- ok;
- true -> rabbit_log:info(
- "Keeping ~s connections: the node is already back~n", [Node])
- end.
-
--spec on_node_up(node()) -> ok.
-on_node_up(Node) ->
- rabbit_connection_tracker:reregister(Node),
- ok.
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{_ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
-spec is_over_connection_limit(rabbit_types:vhost()) -> {true, non_neg_integer()} | false.
@@ -133,19 +210,24 @@ is_over_connection_limit(VirtualHost) ->
count_connections_in(VirtualHost) ->
try
- case mnesia:transaction(
+ Ns = lists:map(
+ fun (Node) ->
+ Tab = tracked_connection_per_vhost_table_name_for(Node),
+ try
+ case mnesia:transaction(
fun() ->
- case mnesia:dirty_read(
- {?PER_VHOST_COUNTER_TABLE,
- VirtualHost}) of
- [] -> 0;
- [Val] ->
- Val#tracked_connection_per_vhost.connection_count
- end
+ case mnesia:dirty_read({Tab, VirtualHost}) of
+ [] -> 0;
+ [Val] -> Val#tracked_connection_per_vhost.connection_count
+ end
end) of
- {atomic, Val} -> Val;
- {aborted, _Reason} -> 0
- end
+ {atomic, Val} -> Val;
+ {aborted, _Reason} -> 0
+ end
+ catch _ -> 0
+ end
+ end, rabbit_mnesia:cluster_nodes(running)),
+ lists:foldl(fun(X, Acc) -> Acc + X end, 0, Ns)
catch
_:Err ->
rabbit_log:error(
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index deaf9bc7f6..91b38d2c39 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -53,23 +53,31 @@ init([]) ->
{ok, []}.
handle_event(#event{type = connection_created, props = Details}, State) ->
- rabbit_connection_tracking:register_connection(
- rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
- ),
- {ok, State};
-%% see rabbit_reader
-handle_event(#event{type = connection_reregistered, props = [{state, ConnState}]}, State) ->
- rabbit_connection_tracking:register_connection(
- rabbit_connection_tracking:tracked_connection_from_connection_state(ConnState)
- ),
+ ThisNode = node(),
+ case proplists:get_value(node, Details) of
+ ThisNode ->
+ rabbit_connection_tracking:register_connection(
+ rabbit_connection_tracking:tracked_connection_from_connection_created(Details)
+ );
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
{ok, State};
handle_event(#event{type = connection_closed, props = Details}, State) ->
- %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
- %% {pid,<0.1774.0>},
- %% {node, rabbit@hostname}]
- rabbit_connection_tracking:unregister_connection(
- {proplists:get_value(node, Details),
- proplists:get_value(name, Details)}),
+ ThisNode = node(),
+ case proplists:get_value(node, Details) of
+ ThisNode ->
+ %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
+ %% {pid,<0.1774.0>},
+ %% {node, rabbit@hostname}]
+ rabbit_connection_tracking:unregister_connection(
+ {proplists:get_value(node, Details),
+ proplists:get_value(name, Details)});
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
{ok, State};
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
VHost = proplists:get_value(name, Details),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 43d268c374..88ea3a80f5 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -148,6 +148,7 @@ auto_cluster(TryNodes, NodeType) ->
rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
init_db_and_upgrade(DiscNodes, NodeType, true),
+ rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning(
@@ -194,6 +195,7 @@ join_cluster(DiscoveryNode, NodeType) ->
[ClusterNodes, NodeType]),
ok = init_db_with_mnesia(ClusterNodes, NodeType,
true, true),
+ rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster(),
ok;
{error, Reason} ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 9ed790c75d..0322aacfd1 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -732,7 +732,6 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node),
- ok = rabbit_connection_tracking:on_node_down(Node),
%% If we have been partitioned, and we are now in the only remaining
%% partition, we no longer care about partitions - forget them. Note
%% that we do not attempt to deal with individual (other) partitions
@@ -761,8 +760,7 @@ ensure_keepalive_timer(State) ->
handle_live_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_up(Node),
ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node),
- ok = rabbit_connection_tracking:on_node_up(Node).
+ ok = rabbit_mnesia:on_node_up(Node).
maybe_autoheal(State = #state{partitions = []}) ->
State;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 47053fafe7..7efa36d637 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -56,8 +56,6 @@
-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
-rabbit_upgrade({vhost_limits, mnesia, []}).
--rabbit_upgrade({tracked_connection, mnesia, [vhost_limits]}).
--rabbit_upgrade({tracked_connection_per_vhost, mnesia, [tracked_connection]}).
%% -------------------------------------------------------------------
@@ -92,23 +90,10 @@
-spec recoverable_slaves() -> 'ok'.
-spec user_password_hashing() -> 'ok'.
-spec vhost_limits() -> 'ok'.
--spec tracked_connection() -> 'ok'.
--spec tracked_connection_per_vhost() -> 'ok'.
%%--------------------------------------------------------------------
-tracked_connection() ->
- create(rabbit_tracked_connection, [{record_name, tracked_connection},
- {attributes, [id, node, vhost, name,
- pid, protocol,
- peer_host, peer_port,
- username, connected_at]}]).
-
-tracked_connection_per_vhost() ->
- create(tracked_connection_per_vhost, [{record_name, tracked_connection_per_vhost},
- {attributes, [vhost, connection_count]}]).
-
%% replaces vhost.dummy (used to avoid having a single-field record
%% which Mnesia doesn't like) with vhost.limits (which is actually
%% used)