diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2015-10-27 11:31:57 +0900 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2015-10-27 11:31:57 +0900 |
| commit | e0f0d3db7796723012cb70cb53cfcb534396e264 (patch) | |
| tree | 55d791d0bf52d520aa6342c47e3cd8fb752e7f53 /src | |
| parent | 161e256c2f5a3bb278957f6c1e0f43cf869ba5fa (diff) | |
| parent | 48e32a894e51ba2a6ee7433744196eb21d168066 (diff) | |
| download | rabbitmq-server-git-e0f0d3db7796723012cb70cb53cfcb534396e264.tar.gz | |
Merge branch 'fix-alarms' of https://github.com/binarin/rabbitmq-server into rabbitmq-server-379
rabbit_reader is now in rabbit-common.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_alarm.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 4 |
2 files changed, 92 insertions, 20 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 557fa31335..160129b008 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -13,6 +13,16 @@ %% The Initial Developer of the Original Code is GoPivotal, Inc. %% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved. %% +%% @doc There are two types of alarms handled by this module: +%% - per-node resource (disk, memory) alarms for whole cluster. If any node +%% has any alarm, then all publishing should be disabled througout the +%% cluster until every alarm clears. When some nodes sets such an alarm, +%% this information is automatically propagated through whole cluster. +%% `#alarms.alarmed_nodes' is being used to track this type of alarms. +%% - limits local to this node (file_descriptor_limit). Used for information +%% purposes only - logging and getting node status. No cluster-wide propagation +%% of this info happens. `#alarms.alarms' is being used to track this type of alarms. +%% @end -module(rabbit_alarm). @@ -28,20 +38,35 @@ -define(SERVER, ?MODULE). --record(alarms, {alertees, alarmed_nodes, alarms}). - %%---------------------------------------------------------------------------- -ifdef(use_specs). +-record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()), + alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]), + alarms :: [alarm()]}). + +-type(local_alarm() :: 'file_descriptor_limit'). +-type(resource_alarm_source() :: 'disk' | 'node'). +-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}). +-type(alarm() :: local_alarm() | resource_alarm()). +-type(resource_alert() :: {WasAlarmSetForNode :: boolean(), + IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(), + NodeForWhichAlarmWasSetOrCleared :: node()}). + -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]). --spec(set_alarm/1 :: (any()) -> 'ok'). --spec(clear_alarm/1 :: (any()) -> 'ok'). +-spec(set_alarm/1 :: ({alarm(), []}) -> 'ok'). +-spec(clear_alarm/1 :: (alarm()) -> 'ok'). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(get_alarms/0 :: () -> [{alarm(), []}]). + +-else. + +-record(alarms, {alertees, alarmed_nodes, alarms}). -endif. @@ -68,6 +93,12 @@ start() -> stop() -> ok. +%% @doc +%% Registers handler that should be called on every resource alarm change. +%% Given call rabbit_alarm:register(Pid, {M, F, A}) the handler would be +%% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source' +%% has type resource_alarm_source() and `Alert' has type resource_alert(). +%% @end register(Pid, AlertMFA) -> gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). @@ -79,10 +110,10 @@ get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity). on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}). on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}). -remote_conserve_resources(Pid, Source, true) -> +remote_conserve_resources(Pid, Source, {true, _, _}) -> gen_event:notify({?SERVER, node(Pid)}, {set_alarm, {{resource_limit, Source, node()}, []}}); -remote_conserve_resources(Pid, Source, false) -> +remote_conserve_resources(Pid, Source, {false, _, _}) -> gen_event:notify({?SERVER, node(Pid)}, {clear_alarm, {resource_limit, Source, node()}}). @@ -98,12 +129,17 @@ handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) -> {ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])), internal_register(Pid, AlertMFA, State)}; -handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> - {ok, Alarms, State}; +handle_call(get_alarms, State) -> + {ok, get_alarms(State), State}; handle_call(_Request, State) -> {ok, not_understood, State}. +handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) -> + case is_node_alarmed(Source, Node, State) of + true -> {ok, State}; + false -> handle_set_resource_alarm(Source, Node, State) + end; handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> case lists:member(Alarm, Alarms) of true -> {ok, State}; @@ -111,6 +147,13 @@ handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms}) end; +handle_event({clear_alarm, {resource_limit, Source, Node}}, State) -> + case is_node_alarmed(Source, Node, State) of + true -> + handle_clear_resource_alarm(Source, Node, State); + false -> + {ok, State} + end; handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) -> case lists:keymember(Alarm, 1, Alarms) of true -> handle_clear_alarm( @@ -127,8 +170,16 @@ handle_event({node_up, Node}, State) -> {register, self(), {?MODULE, remote_conserve_resources, []}}), {ok, State}; -handle_event({node_down, Node}, State) -> - {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)}; +handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) -> + AlarmsForDeadNode = case dict:find(Node, AN) of + {ok, V} -> V; + error -> [] + end, + {ok, lists:foldr(fun(Source, AccState) -> + rabbit_log:warning("~s resource limit alarm cleared for dead node ~p~n", + [Source, Node]), + maybe_alert(fun dict_unappend/3, Node, Source, false, AccState) + end, State, AlarmsForDeadNode)}; handle_event({register, Pid, AlertMFA}, State) -> {ok, internal_register(Pid, AlertMFA, State)}; @@ -158,9 +209,6 @@ dict_append(Key, Val, Dict) -> end, dict:store(Key, lists:usort([Val|L]), Dict). -dict_unappend_all(Key, _Val, Dict) -> - dict:erase(Key, Dict). - dict_unappend(Key, Val, Dict) -> L = case dict:find(Key, Dict) of {ok, V} -> V; @@ -172,10 +220,17 @@ dict_unappend(Key, Val, Dict) -> X -> dict:store(Key, X, Dict) end. -maybe_alert(UpdateFun, Node, Source, Alert, +maybe_alert(UpdateFun, Node, Source, WasAlertAdded, State = #alarms{alarmed_nodes = AN, alertees = Alertees}) -> AN1 = UpdateFun(Node, Source, AN), + %% Is alarm for Source is still set on some node? + StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)), + case StillHasAlerts of + true -> ok; + false -> rabbit_log:warning("~s resource limit alarm cleared from the whole cluster~n", [Source]) + end, + Alert = {WasAlertAdded, StillHasAlerts, Node}, case node() of Node -> ok = alert_remote(Alert, Alertees, Source); _ -> ok @@ -202,20 +257,21 @@ internal_register(Pid, {M, F, A} = AlertMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), case dict:find(node(), State#alarms.alarmed_nodes) of - {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources]; + {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources]; error -> ok end, NewAlertees = dict:store(Pid, AlertMFA, Alertees), State#alarms{alertees = NewAlertees}. -handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> +handle_set_resource_alarm(Source, Node, State) -> rabbit_log:warning( "~s resource limit alarm set on node ~p.~n~n" "**********************************************************~n" "*** Publishers will be blocked until this alarm clears ***~n" "**********************************************************~n", [Source, Node]), - {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}; + {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}. + handle_set_alarm({file_descriptor_limit, []}, State) -> rabbit_log:warning( "file descriptor limit alarm set.~n~n" @@ -227,13 +283,27 @@ handle_set_alarm(Alarm, State) -> rabbit_log:warning("alarm '~p' set~n", [Alarm]), {ok, State}. -handle_clear_alarm({resource_limit, Source, Node}, State) -> +handle_clear_resource_alarm(Source, Node, State) -> rabbit_log:warning("~s resource limit alarm cleared on node ~p~n", [Source, Node]), - {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}; + {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}. + handle_clear_alarm(file_descriptor_limit, State) -> rabbit_log:warning("file descriptor limit alarm cleared~n"), {ok, State}; handle_clear_alarm(Alarm, State) -> rabbit_log:warning("alarm '~p' cleared~n", [Alarm]), {ok, State}. + +is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) -> + case dict:find(Node, AN) of + {ok, Sources} -> + lists:member(Source, Sources); + error -> + false + end. + +get_alarms(#alarms{alarms = Alarms, + alarmed_nodes = AN}) -> + Alarms ++ [ {{resource_limit, Source, Node}, []} + || {Node, Sources} <- dict:to_list(AN), Source <- Sources ]. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 24b3ae9af4..31f1da2271 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -86,7 +86,9 @@ report_ram_duration(Pid, QueueDuration) -> stop() -> gen_server2:cast(?SERVER, stop). -conserve_resources(Pid, disk, Conserve) -> +%% Paging should be enabled/disabled only in response to disk resource alarms +%% for current node. +conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node -> gen_server2:cast(Pid, {disk_alarm, Conserve}); conserve_resources(_Pid, _Source, _Conserve) -> ok. |
