diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-01-09 03:46:24 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-01-09 03:46:24 +0300 |
| commit | be27684a7b8651966530d594ba4ea454878cba8f (patch) | |
| tree | 9fb30542259ea25498399170714da15ed8ae3ae0 /src | |
| parent | a375473f3d612b0e8a0db95c9e3805229e3aa868 (diff) | |
| parent | 2b934f3357eee5a23c3d7e26ba326864f1e05c0c (diff) | |
| download | rabbitmq-server-git-be27684a7b8651966530d594ba4ea454878cba8f.tar.gz | |
Merge pull request #1816 from rabbitmq/rabbitmq-server-952
Add sysmon-handler to RabbitMQ
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_sysmon_handler.erl | 230 | ||||
| -rw-r--r-- | src/rabbit_sysmon_minder.erl | 156 |
3 files changed, 394 insertions, 1 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 3401391b09..2a37d0ba75 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -156,6 +156,13 @@ {requires, kernel_ready}, {enables, core_initialized}]}). +-rabbit_boot_step({rabbit_sysmon_minder, + [{description, "sysmon_handler supervisor"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_sysmon_minder]}}, + {requires, kernel_ready}, + {enables, core_initialized}]}). + -rabbit_boot_step({core_initialized, [{description, "core initialized"}, {requires, kernel_ready}]}). @@ -225,7 +232,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --define(APPS, [os_mon, mnesia, rabbit_common, ra, rabbit]). +-define(APPS, [os_mon, mnesia, rabbit_common, ra, sysmon_handler, rabbit]). -define(ASYNC_THREADS_WARNING_THRESHOLD, 8). diff --git a/src/rabbit_sysmon_handler.erl b/src/rabbit_sysmon_handler.erl new file mode 100644 index 0000000000..4e878f618d --- /dev/null +++ b/src/rabbit_sysmon_handler.erl @@ -0,0 +1,230 @@ +%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. + +%% @doc A custom event handler to the `sysmon_handler' application's +%% `system_monitor' event manager. +%% +%% This module attempts to discover more information about a process +%% that generates a system_monitor event. + +-module(rabbit_sysmon_handler). + +-behaviour(gen_event). + +%% API +-export([add_handler/0]). + +%% gen_event callbacks +-export([init/1, handle_event/2, handle_call/2, + handle_info/2, terminate/2, code_change/3]). + +-record(state, {timer_ref :: reference()}). + +-define(INACTIVITY_TIMEOUT, 5000). + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +add_handler() -> + %% Vulnerable to race conditions (installing handler multiple + %% times), but risk is zero in the common OTP app startup case. + case lists:member(?MODULE, gen_event:which_handlers(sysmon_handler)) of + true -> + ok; + false -> + sysmon_handler_filter:add_custom_handler(?MODULE, []) + end. + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a new event handler is added to an event manager, +%% this function is called to initialize the event handler. +%% +%% @spec init(Args) -> {ok, State} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}, hibernate}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives an event sent using +%% gen_event:notify/2 or gen_event:sync_notify/2, this function is +%% called for each installed event handler to handle the event. +%% +%% @spec handle_event(Event, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_event({monitor, Pid, Type, _Info}, + State=#state{timer_ref=TimerRef}) when Pid == self() -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + maybe_collect_garbage(Type), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event({monitor, PidOrPort, Type, Info}, State=#state{timer_ref=TimerRef}) -> + %% Reset the inactivity timeout + NewTimerRef = reset_timer(TimerRef), + {Fmt, Args} = format_pretty_proc_or_port_info(PidOrPort), + rabbit_log:warning("~p ~w ~w " ++ Fmt ++ " ~w", [?MODULE, Type, PidOrPort] ++ Args ++ [Info]), + {ok, State#state{timer_ref=NewTimerRef}}; +handle_event(Event, State=#state{timer_ref=TimerRef}) -> + NewTimerRef = reset_timer(TimerRef), + rabbit_log:warning("~p unhandled event: ~p", [?MODULE, Event]), + {ok, State#state{timer_ref=NewTimerRef}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives a request sent using +%% gen_event:call/3,4, this function is called for the specified +%% event handler to handle the request. +%% +%% @spec handle_call(Request, State) -> +%% {ok, Reply, State} | +%% {swap_handler, Reply, Args1, State1, Mod2, Args2} | +%% {remove_handler, Reply} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Call, State) -> + Reply = not_supported, + {ok, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for each installed event handler when +%% an event manager receives any other message than an event or a +%% synchronous request (or a system message). +%% +%% @spec handle_info(Info, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_info(inactivity_timeout, State) -> + %% No events have arrived for the timeout period + %% so hibernate to free up resources. + {ok, State, hibernate}; +handle_info(Info, State) -> + rabbit_log:info("handle_info got ~p", [Info]), + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event handler is deleted from an event manager, this +%% function is called. It should be the opposite of Module:init/1 and +%% do any necessary cleaning up. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +format_pretty_proc_or_port_info(PidOrPort) -> + try + case get_pretty_proc_or_port_info(PidOrPort) of + undefined -> + {"", []}; + Res -> + Res + end + catch C:E:S -> + {"Pid ~w, ~W ~W at ~w\n", + [PidOrPort, C, 20, E, 20, S]} + end. + +get_pretty_proc_or_port_info(Pid) when is_pid(Pid) -> + Infos = [registered_name, initial_call, current_function, message_queue_len], + case process_info(Pid, Infos) of + undefined -> + undefined; + [] -> + undefined; + [{registered_name, RN0}, ICT1, {_, CF}, {_, MQL}] -> + ICT = case proc_lib:translate_initial_call(Pid) of + {proc_lib, init_p, 5} -> % not by proc_lib, see docs + ICT1; + ICT2 -> + {initial_call, ICT2} + end, + RNL = if RN0 == [] -> []; + true -> [{name, RN0}] + end, + {"~w", [RNL ++ [ICT, CF, {message_queue_len, MQL}]]} + end; +get_pretty_proc_or_port_info(Port) when is_port(Port) -> + PortInfo = erlang:port_info(Port), + {value, {name, Name}, PortInfo2} = lists:keytake(name, 1, PortInfo), + QueueSize = [erlang:port_info(Port, queue_size)], + Connected = case proplists:get_value(connected, PortInfo2) of + undefined -> + []; + ConnectedPid -> + case proc_lib:translate_initial_call(ConnectedPid) of + {proc_lib, init_p, 5} -> % not by proc_lib, see docs + []; + ICT -> + [{initial_call, ICT}] + end + end, + {"name ~s ~w", [Name, lists:append([PortInfo2, QueueSize, Connected])]}. + + +%% @doc If the message type is due to a large heap warning +%% and the source is ourself, go ahead and collect garbage +%% to avoid the death spiral. +-spec maybe_collect_garbage(atom()) -> ok. +maybe_collect_garbage(large_heap) -> + erlang:garbage_collect(), + ok; +maybe_collect_garbage(_) -> + ok. + +-spec reset_timer(undefined | reference()) -> reference(). +reset_timer(undefined) -> + erlang:send_after(?INACTIVITY_TIMEOUT, self(), inactivity_timeout); +reset_timer(TimerRef) -> + _ = erlang:cancel_timer(TimerRef), + reset_timer(undefined). diff --git a/src/rabbit_sysmon_minder.erl b/src/rabbit_sysmon_minder.erl new file mode 100644 index 0000000000..b2f5dd6316 --- /dev/null +++ b/src/rabbit_sysmon_minder.erl @@ -0,0 +1,156 @@ +%% ------------------------------------------------------------------- +%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(rabbit_sysmon_minder). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + %% Add our system_monitor event handler. We do that here because + %% we have a process at our disposal (i.e. ourself) to receive the + %% notification in the very unlikely event that the + %% sysmon_handler has crashed and been removed from the + %% sysmon_handler gen_event server. (If we had a supervisor + %% or app-starting process add the handler, then if the handler + %% crashes, nobody will act on the crash notification.) + rabbit_sysmon_handler:add_handler(), + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info({gen_event_EXIT, rabbit_sysmon_handler, _}, State) -> + %% SASL will create an error message, no need for us to duplicate it. + %% + %% Our handler should never crash, but it did indeed crash. If + %% there's a pathological condition somewhere that's generating + %% lots of unforseen things that crash core's custom handler, we + %% could make things worse by jumping back into the exploding + %% volcano. Wait a little bit before jumping back. Besides, the + %% system_monitor data is nice but is not critical: there is no + %% need to make things worse if things are indeed bad, and if we + %% miss a few seconds of system_monitor events, the world will not + %% end. + timer:sleep(2*1000), + rabbit_sysmon_handler:add_handler(), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. |
