diff options
| -rw-r--r-- | src/rabbit_connection_tracking.erl | 134 | ||||
| -rw-r--r-- | src/rabbit_connection_tracking_handler.erl | 98 |
2 files changed, 147 insertions, 85 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index cdfc0005ee..34c12f41cb 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -16,6 +16,8 @@ -module(rabbit_connection_tracking). +-behaviour(gen_server). + %% Abstracts away how tracked connection records are stored %% and queried. %% @@ -43,6 +45,110 @@ -import(rabbit_misc, [pget/2]). +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([close_connections/3]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% +%% GenServer API +%% + +init([]) -> + {ok, nostate}. + +handle_call(_Msg, _From, nostate) -> + {reply, ok, nostate}. + + +handle_cast({connection_created, Details}, nostate) -> + ThisNode = node(), + case pget(node, Details) of + ThisNode -> + TConn = tracked_connection_from_connection_created(Details), + ConnId = TConn#tracked_connection.id, + try + register_connection(TConn) + catch + error:{no_exists, _} -> + Msg = "Could not register connection ~p for tracking, " + "its table is not ready yet or the connection terminated prematurely", + rabbit_log_connection:warning(Msg, [ConnId]), + ok; + error:Err -> + Msg = "Could not register connection ~p for tracking: ~p", + rabbit_log_connection:warning(Msg, [ConnId, Err]), + ok + end; + _OtherNode -> + %% ignore + ok + end, + {noreply, nostate}; +handle_cast({connection_closed, Details}, nostate) -> + ThisNode = node(), + case pget(node, Details) of + ThisNode -> + %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, + %% {pid,<0.1774.0>}, + %% {node, rabbit@hostname}] + unregister_connection( + {pget(node, Details), + pget(name, Details)}); + _OtherNode -> + %% ignore + ok + end, + {noreply, nostate}; +handle_cast({vhost_deleted, Details}, nostate) -> + VHost = pget(name, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), + close_connections(rabbit_connection_tracking:list(VHost), + rabbit_misc:format("vhost '~s' is deleted", [VHost])), + {noreply, nostate}; +%% Note: under normal circumstances this will be called immediately +%% after the vhost_deleted above. Therefore we should be careful about +%% what we log and be more defensive. +handle_cast({vhost_down, Details}, nostate) -> + VHost = pget(name, Details), + Node = pget(node, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'" + " because the vhost is stopping", + [VHost, Node]), + close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), + rabbit_misc:format("vhost '~s' is down", [VHost])), + {noreply, nostate}; +handle_cast({user_deleted, Details}, nostate) -> + Username = pget(name, Details), + rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), + close_connections(rabbit_connection_tracking:list_of_user(Username), + rabbit_misc:format("user '~s' is deleted", [Username])), + {noreply, nostate}; +%% A node had been deleted from the cluster. +handle_cast({node_deleted, Details}, nostate) -> + Node = pget(node, Details), + rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]), + rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node), + rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node), + {noreply, nostate}; +handle_cast(_Msg, nostate) -> + {noreply, nostate}. + +handle_info(_Info, nostate) -> + {noreply, nostate}. + +terminate(_Reason, nostate) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + %% %% API %% @@ -339,3 +445,31 @@ tracked_connection_from_connection_state(#connection{ {type, network}, {peer_port, PeerPort}, {peer_host, PeerHost}]). + +close_connections(Tracked, Message) -> + close_connections(Tracked, Message, 0). + +close_connections(Tracked, Message, Delay) -> + [begin + close_connection(Conn, Message), + timer:sleep(Delay) + end || Conn <- Tracked], + ok. + +close_connection(#tracked_connection{pid = Pid, type = network}, Message) -> + try + rabbit_networking:close_connection(Pid, Message) + catch error:{not_a_connection, _} -> + %% could has been closed concurrently, or the input + %% is bogus. In any case, we should not terminate + ok; + _:Err -> + %% ignore, don't terminate + rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]), + ok + end; +close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> + %% Do an RPC call to the node running the direct client. + Node = node(Pid), + rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]). + diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index 9525f5777c..e3ca753582 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -27,8 +27,6 @@ -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --export([close_connections/3]). - -include_lib("rabbit.hrl"). -import(rabbit_misc, [pget/2]). @@ -38,9 +36,15 @@ [rabbit_event, ?MODULE, []]}}, {cleanup, {gen_event, delete_handler, [rabbit_event, ?MODULE, []]}}, - {requires, [rabbit_event, rabbit_node_monitor]}, + {requires, [rabbit_connection_tracking]}, {enables, recovery}]}). +-rabbit_boot_step({rabbit_connection_tracking, + [{description, "statistics event manager"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_connection_tracking]}}, + {requires, [rabbit_event, rabbit_node_monitor]}, + {enables, ?MODULE}]}). %% %% API @@ -50,74 +54,26 @@ init([]) -> {ok, []}. handle_event(#event{type = connection_created, props = Details}, State) -> - ThisNode = node(), - case pget(node, Details) of - ThisNode -> - TConn = rabbit_connection_tracking:tracked_connection_from_connection_created(Details), - ConnId = TConn#tracked_connection.id, - try - rabbit_connection_tracking:register_connection(TConn) - catch - error:{no_exists, _} -> - Msg = "Could not register connection ~p for tracking, " - "its table is not ready yet or the connection terminated prematurely", - rabbit_log_connection:warning(Msg, [ConnId]), - ok; - error:Err -> - Msg = "Could not register connection ~p for tracking: ~p", - rabbit_log_connection:warning(Msg, [ConnId, Err]), - ok - end; - _OtherNode -> - %% ignore - ok - end, + gen_server:cast(rabbit_connection_tracking, {connection_created, Details}), {ok, State}; handle_event(#event{type = connection_closed, props = Details}, State) -> - ThisNode = node(), - case pget(node, Details) of - ThisNode -> - %% [{name,<<"127.0.0.1:64078 -> 127.0.0.1:5672">>}, - %% {pid,<0.1774.0>}, - %% {node, rabbit@hostname}] - rabbit_connection_tracking:unregister_connection( - {pget(node, Details), - pget(name, Details)}); - _OtherNode -> - %% ignore - ok - end, + gen_server:cast(rabbit_connection_tracking, {connection_closed, Details}), {ok, State}; handle_event(#event{type = vhost_deleted, props = Details}, State) -> - VHost = pget(name, Details), - rabbit_log_connection:info("Closing all connections in vhost '~s' because it's being deleted", [VHost]), - close_connections(rabbit_connection_tracking:list(VHost), - rabbit_misc:format("vhost '~s' is deleted", [VHost])), + gen_server:cast(rabbit_connection_tracking, {vhost_deleted, Details}), {ok, State}; %% Note: under normal circumstances this will be called immediately %% after the vhost_deleted above. Therefore we should be careful about %% what we log and be more defensive. handle_event(#event{type = vhost_down, props = Details}, State) -> - VHost = pget(name, Details), - Node = pget(node, Details), - rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'" - " because the vhost is stopping", - [VHost, Node]), - close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), - rabbit_misc:format("vhost '~s' is down", [VHost])), + gen_server:cast(rabbit_connection_tracking, {vhost_down, Details}), {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> - Username = pget(name, Details), - rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), - close_connections(rabbit_connection_tracking:list_of_user(Username), - rabbit_misc:format("user '~s' is deleted", [Username])), + gen_server:cast(rabbit_connection_tracking, {user_deleted, Details}), {ok, State}; %% A node had been deleted from the cluster. handle_event(#event{type = node_deleted, props = Details}, State) -> - Node = pget(node, Details), - rabbit_log_connection:info("Node '~s' was removed from the cluster, deleting its connection tracking tables...", [Node]), - rabbit_connection_tracking:delete_tracked_connections_table_for_node(Node), - rabbit_connection_tracking:delete_per_vhost_tracked_connections_table_for_node(Node), + gen_server:cast(rabbit_connection_tracking, {node_deleted, Details}), {ok, State}; handle_event(_Event, State) -> {ok, State}. @@ -133,31 +89,3 @@ terminate(_Arg, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - - -close_connections(Tracked, Message) -> - close_connections(Tracked, Message, 0). - -close_connections(Tracked, Message, Delay) -> - [begin - close_connection(Conn, Message), - timer:sleep(Delay) - end || Conn <- Tracked], - ok. - -close_connection(#tracked_connection{pid = Pid, type = network}, Message) -> - try - rabbit_networking:close_connection(Pid, Message) - catch error:{not_a_connection, _} -> - %% could has been closed concurrently, or the input - %% is bogus. In any case, we should not terminate - ok; - _:Err -> - %% ignore, don't terminate - rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]), - ok - end; -close_connection(#tracked_connection{pid = Pid, type = direct}, Message) -> - %% Do an RPC call to the node running the direct client. - Node = node(Pid), - rpc:call(Node, amqp_direct_connection, server_close, [Pid, 320, Message]). |
