summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-04-05 17:44:27 -0400
committerDaniil Fedotov <hairyhum@gmail.com>2019-04-05 17:44:27 -0400
commitd1b893fe637638cba5984ac3653f8148ec0e517d (patch)
tree8d5dda00b8bbe1be8dbb7248f03c3c35b55bda28 /src
parent09ab089066e8148ebe30ce3183958ea039470826 (diff)
downloadrabbitmq-server-git-d1b893fe637638cba5984ac3653f8148ec0e517d.tar.gz
Handle connection tracking events in a separate process.
Connection tracking event handling may involve DB operations and closing of connections (e.g. for vhost deletion). Because gen_event handles all events in the same process - this may cause message accumulation in the rabbit_event process and memory accumulation. Connection tracking process will only receive connection tracking events and all other events will be dropped, freeing the memory. Follow-up to #1722 [#164353033]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_connection_tracking.erl134
-rw-r--r--src/rabbit_connection_tracking_handler.erl98
2 files changed, 147 insertions, 85 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index cdfc0005ee..34c12f41cb 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -16,6 +16,8 @@
-module(rabbit_connection_tracking).
+-behaviour(gen_server).
+
%% Abstracts away how tracked connection records are stored
%% and queried.
%%
@@ -43,6 +45,110 @@
-import(rabbit_misc, [pget/2]).
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([close_connections/3]).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%
+%% GenServer API
+%%
+
+init([]) ->
+ {ok, nostate}.
+
+handle_call(_Msg, _From, nostate) ->
+ {reply, ok, nostate}.
+
+
+handle_cast({connection_created, Details}, nostate) ->
+ ThisNode = node(),
+ case pget(node, Details) of
+ ThisNode ->
+ TConn = tracked_connection_from_connection_created(Details),
+ ConnId = TConn#tracked_connection.id,
+ try
+ register_connection(TConn)
+ catch
+ error:{no_exists, _} ->
+ Msg = "Could not register connection ~p for tracking, "
+ "its table is not ready yet or the connection terminated prematurely",
+ rabbit_log_connection:warning(Msg, [ConnId]),
+ ok;
+ error:Err ->
+ Msg = "Could not register connection ~p for tracking: ~p",
+ rabbit_log_connection:warning(Msg, [ConnId, Err]),
+ ok
+ end;
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
+ {noreply, nostate};
+handle_cast({connection_closed, Details}, nostate) ->
+ ThisNode = node(),
+ case pget(node, Details) of
+ ThisNode ->
+ %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
+ %% {pid,<0.1774.0>},
+ %% {node, rabbit@hostname}]
+ unregister_connection(
+ {pget(node, Details),
+ pget(name, Details)});
+ _OtherNode ->
+ %% ignore
+ ok
+ end,
+ {noreply, nostate};
+handle_cast({vhost_deleted, Details}, nostate) ->
+ VHost = pget(name, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
+ close_connections(rabbit_connection_tracking:list(VHost),
+ rabbit_misc:format("vhost '~s' is deleted", [VHost])),
+ {noreply, nostate};
+%% Note: under normal circumstances this will be called immediately
+%% after the vhost_deleted above. Therefore we should be careful about
+%% what we log and be more defensive.
+handle_cast({vhost_down, Details}, nostate) ->
+ VHost = pget(name, Details),
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
+ " because the vhost is stopping",
+ [VHost, Node]),
+ close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
+ rabbit_misc:format("vhost '~s' is down", [VHost])),
+ {noreply, nostate};
+handle_cast({user_deleted, Details}, nostate) ->
+ Username = pget(name, Details),
+ rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
+ close_connections(rabbit_connection_tracking:list_of_user(Username),
+ rabbit_misc:format("user '~s' is deleted", [Username])),
+ {noreply, nostate};
+%% A node had been deleted from the cluster.
+handle_cast({node_deleted, Details}, nostate) ->
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]),
+ rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node),
+ rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node),
+ {noreply, nostate};
+handle_cast(_Msg, nostate) ->
+ {noreply, nostate}.
+
+handle_info(_Info, nostate) ->
+ {noreply, nostate}.
+
+terminate(_Reason, nostate) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
%%
%% API
%%
@@ -339,3 +445,31 @@ tracked_connection_from_connection_state(#connection{
{type, network},
{peer_port, PeerPort},
{peer_host, PeerHost}]).
+
+close_connections(Tracked, Message) ->
+ close_connections(Tracked, Message, 0).
+
+close_connections(Tracked, Message, Delay) ->
+ [begin
+ close_connection(Conn, Message),
+ timer:sleep(Delay)
+ end || Conn <- Tracked],
+ ok.
+
+close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
+ try
+ rabbit_networking:close_connection(Pid, Message)
+ catch error:{not_a_connection, _} ->
+ %% could has been closed concurrently, or the input
+ %% is bogus. In any case, we should not terminate
+ ok;
+ _:Err ->
+ %% ignore, don't terminate
+ rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
+ ok
+ end;
+close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
+ %% Do an RPC call to the node running the direct client.
+ Node = node(Pid),
+ rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).
+
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index 9525f5777c..e3ca753582 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -27,8 +27,6 @@
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--export([close_connections/3]).
-
-include_lib("rabbit.hrl").
-import(rabbit_misc, [pget/2]).
@@ -38,9 +36,15 @@
[rabbit_event, ?MODULE, []]}},
{cleanup, {gen_event, delete_handler,
[rabbit_event, ?MODULE, []]}},
- {requires, [rabbit_event, rabbit_node_monitor]},
+ {requires, [rabbit_connection_tracking]},
{enables, recovery}]}).
+-rabbit_boot_step({rabbit_connection_tracking,
+ [{description, "statistics event manager"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_connection_tracking]}},
+ {requires, [rabbit_event, rabbit_node_monitor]},
+ {enables, ?MODULE}]}).
%%
%% API
@@ -50,74 +54,26 @@ init([]) ->
{ok, []}.
handle_event(#event{type = connection_created, props = Details}, State) ->
- ThisNode = node(),
- case pget(node, Details) of
- ThisNode ->
- TConn = rabbit_connection_tracking:tracked_connection_from_connection_created(Details),
- ConnId = TConn#tracked_connection.id,
- try
- rabbit_connection_tracking:register_connection(TConn)
- catch
- error:{no_exists, _} ->
- Msg = "Could not register connection ~p for tracking, "
- "its table is not ready yet or the connection terminated prematurely",
- rabbit_log_connection:warning(Msg, [ConnId]),
- ok;
- error:Err ->
- Msg = "Could not register connection ~p for tracking: ~p",
- rabbit_log_connection:warning(Msg, [ConnId, Err]),
- ok
- end;
- _OtherNode ->
- %% ignore
- ok
- end,
+ gen_server:cast(rabbit_connection_tracking, {connection_created, Details}),
{ok, State};
handle_event(#event{type = connection_closed, props = Details}, State) ->
- ThisNode = node(),
- case pget(node, Details) of
- ThisNode ->
- %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>},
- %% {pid,<0.1774.0>},
- %% {node, rabbit@hostname}]
- rabbit_connection_tracking:unregister_connection(
- {pget(node, Details),
- pget(name, Details)});
- _OtherNode ->
- %% ignore
- ok
- end,
+ gen_server:cast(rabbit_connection_tracking, {connection_closed, Details}),
{ok, State};
handle_event(#event{type = vhost_deleted, props = Details}, State) ->
- VHost = pget(name, Details),
- rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]),
- close_connections(rabbit_connection_tracking:list(VHost),
- rabbit_misc:format("vhost '~s' is deleted", [VHost])),
+ gen_server:cast(rabbit_connection_tracking, {vhost_deleted, Details}),
{ok, State};
%% Note: under normal circumstances this will be called immediately
%% after the vhost_deleted above. Therefore we should be careful about
%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
- VHost = pget(name, Details),
- Node = pget(node, Details),
- rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
- " because the vhost is stopping",
- [VHost, Node]),
- close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
- rabbit_misc:format("vhost '~s' is down", [VHost])),
+ gen_server:cast(rabbit_connection_tracking, {vhost_down, Details}),
{ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
- Username = pget(name, Details),
- rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
- close_connections(rabbit_connection_tracking:list_of_user(Username),
- rabbit_misc:format("user '~s' is deleted", [Username])),
+ gen_server:cast(rabbit_connection_tracking, {user_deleted, Details}),
{ok, State};
%% A node had been deleted from the cluster.
handle_event(#event{type = node_deleted, props = Details}, State) ->
- Node = pget(node, Details),
- rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]),
- rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node),
- rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node),
+ gen_server:cast(rabbit_connection_tracking, {node_deleted, Details}),
{ok, State};
handle_event(_Event, State) ->
{ok, State}.
@@ -133,31 +89,3 @@ terminate(_Arg, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-
-close_connections(Tracked, Message) ->
- close_connections(Tracked, Message, 0).
-
-close_connections(Tracked, Message, Delay) ->
- [begin
- close_connection(Conn, Message),
- timer:sleep(Delay)
- end || Conn <- Tracked],
- ok.
-
-close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
- try
- rabbit_networking:close_connection(Pid, Message)
- catch error:{not_a_connection, _} ->
- %% could has been closed concurrently, or the input
- %% is bogus. In any case, we should not terminate
- ok;
- _:Err ->
- %% ignore, don't terminate
- rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
- ok
- end;
-close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
- %% Do an RPC call to the node running the direct client.
- Node = node(Pid),
- rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]).