summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-01-09 03:46:24 +0300
committerGitHub <noreply@github.com>2019-01-09 03:46:24 +0300
commitbe27684a7b8651966530d594ba4ea454878cba8f (patch)
tree9fb30542259ea25498399170714da15ed8ae3ae0 /src
parenta375473f3d612b0e8a0db95c9e3805229e3aa868 (diff)
parent2b934f3357eee5a23c3d7e26ba326864f1e05c0c (diff)
downloadrabbitmq-server-git-be27684a7b8651966530d594ba4ea454878cba8f.tar.gz
Merge pull request #1816 from rabbitmq/rabbitmq-server-952
Add sysmon-handler to RabbitMQ
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl9
-rw-r--r--src/rabbit_sysmon_handler.erl230
-rw-r--r--src/rabbit_sysmon_minder.erl156
3 files changed, 394 insertions, 1 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3401391b09..2a37d0ba75 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -156,6 +156,13 @@
{requires, kernel_ready},
{enables, core_initialized}]}).
+-rabbit_boot_step({rabbit_sysmon_minder,
+ [{description, "sysmon_handler supervisor"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_sysmon_minder]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({core_initialized,
[{description, "core initialized"},
{requires, kernel_ready}]}).
@@ -225,7 +232,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]).
+-define(APPS, [os_mon, mnesia, rabbit_common, ra, sysmon_handler, rabbit]).
-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
diff --git a/src/rabbit_sysmon_handler.erl b/src/rabbit_sysmon_handler.erl
new file mode 100644
index 0000000000..4e878f618d
--- /dev/null
+++ b/src/rabbit_sysmon_handler.erl
@@ -0,0 +1,230 @@
+%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+
+%% @doc A custom event handler to the `sysmon_handler' application's
+%% `system_monitor' event manager.
+%%
+%% This module attempts to discover more information about a process
+%% that generates a system_monitor event.
+
+-module(rabbit_sysmon_handler).
+
+-behaviour(gen_event).
+
+%% API
+-export([add_handler/0]).
+
+%% gen_event callbacks
+-export([init/1, handle_event/2, handle_call/2,
+ handle_info/2, terminate/2, code_change/3]).
+
+-record(state, {timer_ref :: reference()}).
+
+-define(INACTIVITY_TIMEOUT, 5000).
+
+%%%===================================================================
+%%% gen_event callbacks
+%%%===================================================================
+
+add_handler() ->
+ %% Vulnerable to race conditions (installing handler multiple
+ %% times), but risk is zero in the common OTP app startup case.
+ case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of
+ true ->
+ ok;
+ false ->
+ sysmon_handler_filter:add_custom_handler(?MODULE, [])
+ end.
+
+%%%===================================================================
+%%% gen_event callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever a new event handler is added to an event manager,
+%% this function is called to initialize the event handler.
+%%
+%% @spec init(Args) -> {ok, State}
+%% @end
+%%--------------------------------------------------------------------
+init([]) ->
+ {ok, #state{}, hibernate}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever an event manager receives an event sent using
+%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
+%% called for each installed event handler to handle the event.
+%%
+%% @spec handle_event(Event, State) ->
+%% {ok, State} |
+%% {swap_handler, Args1, State1, Mod2, Args2} |
+%% remove_handler
+%% @end
+%%--------------------------------------------------------------------
+handle_event({monitor, Pid, Type, _Info},
+ State=#state{timer_ref=TimerRef}) when Pid == self() ->
+ %% Reset the inactivity timeout
+ NewTimerRef = reset_timer(TimerRef),
+ maybe_collect_garbage(Type),
+ {ok, State#state{timer_ref=NewTimerRef}};
+handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) ->
+ %% Reset the inactivity timeout
+ NewTimerRef = reset_timer(TimerRef),
+ {Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort),
+ rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]),
+ {ok, State#state{timer_ref=NewTimerRef}};
+handle_event(Event, State=#state{timer_ref=TimerRef}) ->
+ NewTimerRef = reset_timer(TimerRef),
+ rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]),
+ {ok, State#state{timer_ref=NewTimerRef}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever an event manager receives a request sent using
+%% gen_event:call/3,4, this function is called for the specified
+%% event handler to handle the request.
+%%
+%% @spec handle_call(Request, State) ->
+%% {ok, Reply, State} |
+%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
+%% {remove_handler, Reply}
+%% @end
+%%--------------------------------------------------------------------
+handle_call(_Call, State) ->
+ Reply = not_supported,
+ {ok, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called for each installed event handler when
+%% an event manager receives any other message than an event or a
+%% synchronous request (or a system message).
+%%
+%% @spec handle_info(Info, State) ->
+%% {ok, State} |
+%% {swap_handler, Args1, State1, Mod2, Args2} |
+%% remove_handler
+%% @end
+%%--------------------------------------------------------------------
+handle_info(inactivity_timeout, State) ->
+ %% No events have arrived for the timeout period
+ %% so hibernate to free up resources.
+ {ok, State, hibernate};
+handle_info(Info, State) ->
+ rabbit_log:info("handle_info got ~p", [Info]),
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Whenever an event handler is deleted from an event manager, this
+%% function is called. It should be the opposite of Module:init/1 and
+%% do any necessary cleaning up.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+format_pretty_proc_or_port_info(PidOrPort) ->
+ try
+ case get_pretty_proc_or_port_info(PidOrPort) of
+ undefined ->
+ {"", []};
+ Res ->
+ Res
+ end
+ catch C:E:S ->
+ {"Pid ~w, ~W ~W at ~w\n",
+ [PidOrPort, C, 20, E, 20, S]}
+ end.
+
+get_pretty_proc_or_port_info(Pid) when is_pid(Pid) ->
+ Infos = [registered_name, initial_call, current_function, message_queue_len],
+ case process_info(Pid, Infos) of
+ undefined ->
+ undefined;
+ [] ->
+ undefined;
+ [{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] ->
+ ICT = case proc_lib:translate_initial_call(Pid) of
+ {proc_lib, init_p, 5} -> % not by proc_lib, see docs
+ ICT1;
+ ICT2 ->
+ {initial_call, ICT2}
+ end,
+ RNL = if RN0 == [] -> [];
+ true -> [{name, RN0}]
+ end,
+ {"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]}
+ end;
+get_pretty_proc_or_port_info(Port) when is_port(Port) ->
+ PortInfo = erlang:port_info(Port),
+ {value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo),
+ QueueSize = [erlang:port_info(Port, queue_size)],
+ Connected = case proplists:get_value(connected, PortInfo2) of
+ undefined ->
+ [];
+ ConnectedPid ->
+ case proc_lib:translate_initial_call(ConnectedPid) of
+ {proc_lib, init_p, 5} -> % not by proc_lib, see docs
+ [];
+ ICT ->
+ [{initial_call, ICT}]
+ end
+ end,
+ {"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}.
+
+
+%% @doc If the message type is due to a large heap warning
+%% and the source is ourself, go ahead and collect garbage
+%% to avoid the death spiral.
+-spec maybe_collect_garbage(atom()) -> ok.
+maybe_collect_garbage(large_heap) ->
+ erlang:garbage_collect(),
+ ok;
+maybe_collect_garbage(_) ->
+ ok.
+
+-spec reset_timer(undefined | reference()) -> reference().
+reset_timer(undefined) ->
+ erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout);
+reset_timer(TimerRef) ->
+ _ = erlang:cancel_timer(TimerRef),
+ reset_timer(undefined).
diff --git a/src/rabbit_sysmon_minder.erl b/src/rabbit_sysmon_minder.erl
new file mode 100644
index 0000000000..b2f5dd6316
--- /dev/null
+++ b/src/rabbit_sysmon_minder.erl
@@ -0,0 +1,156 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(rabbit_sysmon_minder).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+init([]) ->
+ %% Add our system_monitor event handler. We do that here because
+ %% we have a process at our disposal (i.e. ourself) to receive the
+ %% notification in the very unlikely event that the
+ %% sysmon_handler has crashed and been removed from the
+ %% sysmon_handler gen_event server. (If we had a supervisor
+ %% or app-starting process add the handler, then if the handler
+ %% crashes, nobody will act on the crash notification.)
+ rabbit_sysmon_handler:add_handler(),
+ {ok, #state{}}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, State) ->
+%% {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info({gen_event_EXIT, rabbit_sysmon_handler, _}, State) ->
+ %% SASL will create an error message, no need for us to duplicate it.
+ %%
+ %% Our handler should never crash, but it did indeed crash. If
+ %% there's a pathological condition somewhere that's generating
+ %% lots of unforseen things that crash core's custom handler, we
+ %% could make things worse by jumping back into the exploding
+ %% volcano. Wait a little bit before jumping back. Besides, the
+ %% system_monitor data is nice but is not critical: there is no
+ %% need to make things worse if things are indeed bad, and if we
+ %% miss a few seconds of system_monitor events, the world will not
+ %% end.
+ timer:sleep(2*1000),
+ rabbit_sysmon_handler:add_handler(),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.