summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2015-10-27 11:31:57 +0900
committerMichael Klishin <michael@clojurewerkz.org>2015-10-27 11:31:57 +0900
commite0f0d3db7796723012cb70cb53cfcb534396e264 (patch)
tree55d791d0bf52d520aa6342c47e3cd8fb752e7f53 /src
parent161e256c2f5a3bb278957f6c1e0f43cf869ba5fa (diff)
parent48e32a894e51ba2a6ee7433744196eb21d168066 (diff)
downloadrabbitmq-server-git-e0f0d3db7796723012cb70cb53cfcb534396e264.tar.gz
Merge branch 'fix-alarms' of https://github.com/binarin/rabbitmq-server into rabbitmq-server-379
rabbit_reader is now in rabbit-common.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_alarm.erl108
-rw-r--r--src/rabbit_memory_monitor.erl4
2 files changed, 92 insertions, 20 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 557fa31335..160129b008 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -13,6 +13,16 @@
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
%%
+%% @doc There are two types of alarms handled by this module:
+%% - per-node resource (disk, memory) alarms for whole cluster. If any node
+%% has any alarm, then all publishing should be disabled througout the
+%% cluster until every alarm clears. When some nodes sets such an alarm,
+%% this information is automatically propagated through whole cluster.
+%% `#alarms.alarmed_nodes' is being used to track this type of alarms.
+%% - limits local to this node (file_descriptor_limit). Used for information
+%% purposes only - logging and getting node status. No cluster-wide propagation
+%% of this info happens. `#alarms.alarms' is being used to track this type of alarms.
+%% @end
-module(rabbit_alarm).
@@ -28,20 +38,35 @@
-define(SERVER, ?MODULE).
--record(alarms, {alertees, alarmed_nodes, alarms}).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+-record(alarms, {alertees :: dict:dict(pid(), rabbit_types:mfargs()),
+ alarmed_nodes :: dict:dict(node(), [resource_alarm_source()]),
+ alarms :: [alarm()]}).
+
+-type(local_alarm() :: 'file_descriptor_limit').
+-type(resource_alarm_source() :: 'disk' | 'node').
+-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}).
+-type(alarm() :: local_alarm() | resource_alarm()).
+-type(resource_alert() :: {WasAlarmSetForNode :: boolean(),
+ IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
+ NodeForWhichAlarmWasSetOrCleared :: node()}).
+
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> [atom()]).
--spec(set_alarm/1 :: (any()) -> 'ok').
--spec(clear_alarm/1 :: (any()) -> 'ok').
+-spec(set_alarm/1 :: ({alarm(), []}) -> 'ok').
+-spec(clear_alarm/1 :: (alarm()) -> 'ok').
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(get_alarms/0 :: () -> [{alarm(), []}]).
+
+-else.
+
+-record(alarms, {alertees, alarmed_nodes, alarms}).
-endif.
@@ -68,6 +93,12 @@ start() ->
stop() -> ok.
+%% @doc
+%% Registers handler that should be called on every resource alarm change.
+%% Given call rabbit_alarm:register(Pid, {M, F, A}) the handler would be
+%% called like this: `apply(M, F, A ++ [Pid, Source, Alert])', where `Source'
+%% has type resource_alarm_source() and `Alert' has type resource_alert().
+%% @end
register(Pid, AlertMFA) ->
gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity).
@@ -79,10 +110,10 @@ get_alarms() -> gen_event:call(?SERVER, ?MODULE, get_alarms, infinity).
on_node_up(Node) -> gen_event:notify(?SERVER, {node_up, Node}).
on_node_down(Node) -> gen_event:notify(?SERVER, {node_down, Node}).
-remote_conserve_resources(Pid, Source, true) ->
+remote_conserve_resources(Pid, Source, {true, _, _}) ->
gen_event:notify({?SERVER, node(Pid)},
{set_alarm, {{resource_limit, Source, node()}, []}});
-remote_conserve_resources(Pid, Source, false) ->
+remote_conserve_resources(Pid, Source, {false, _, _}) ->
gen_event:notify({?SERVER, node(Pid)},
{clear_alarm, {resource_limit, Source, node()}}).
@@ -98,12 +129,17 @@ handle_call({register, Pid, AlertMFA}, State = #alarms{alarmed_nodes = AN}) ->
{ok, lists:usort(lists:append([V || {_, V} <- dict:to_list(AN)])),
internal_register(Pid, AlertMFA, State)};
-handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
- {ok, Alarms, State};
+handle_call(get_alarms, State) ->
+ {ok, get_alarms(State), State};
handle_call(_Request, State) ->
{ok, not_understood, State}.
+handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
+ case is_node_alarmed(Source, Node, State) of
+ true -> {ok, State};
+ false -> handle_set_resource_alarm(Source, Node, State)
+ end;
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
case lists:member(Alarm, Alarms) of
true -> {ok, State};
@@ -111,6 +147,13 @@ handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
handle_set_alarm(Alarm, State#alarms{alarms = UpdatedAlarms})
end;
+handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
+ case is_node_alarmed(Source, Node, State) of
+ true ->
+ handle_clear_resource_alarm(Source, Node, State);
+ false ->
+ {ok, State}
+ end;
handle_event({clear_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
case lists:keymember(Alarm, 1, Alarms) of
true -> handle_clear_alarm(
@@ -127,8 +170,16 @@ handle_event({node_up, Node}, State) ->
{register, self(), {?MODULE, remote_conserve_resources, []}}),
{ok, State};
-handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], false, State)};
+handle_event({node_down, Node}, #alarms{alarmed_nodes = AN} = State) ->
+ AlarmsForDeadNode = case dict:find(Node, AN) of
+ {ok, V} -> V;
+ error -> []
+ end,
+ {ok, lists:foldr(fun(Source, AccState) ->
+ rabbit_log:warning("~s resource limit alarm cleared for dead node ~p~n",
+ [Source, Node]),
+ maybe_alert(fun dict_unappend/3, Node, Source, false, AccState)
+ end, State, AlarmsForDeadNode)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -158,9 +209,6 @@ dict_append(Key, Val, Dict) ->
end,
dict:store(Key, lists:usort([Val|L]), Dict).
-dict_unappend_all(Key, _Val, Dict) ->
- dict:erase(Key, Dict).
-
dict_unappend(Key, Val, Dict) ->
L = case dict:find(Key, Dict) of
{ok, V} -> V;
@@ -172,10 +220,17 @@ dict_unappend(Key, Val, Dict) ->
X -> dict:store(Key, X, Dict)
end.
-maybe_alert(UpdateFun, Node, Source, Alert,
+maybe_alert(UpdateFun, Node, Source, WasAlertAdded,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
+ %% Is alarm for Source is still set on some node?
+ StillHasAlerts = lists:any(fun ({_Node, NodeAlerts}) -> lists:member(Source, NodeAlerts) end, dict:to_list(AN1)),
+ case StillHasAlerts of
+ true -> ok;
+ false -> rabbit_log:warning("~s resource limit alarm cleared from the whole cluster~n", [Source])
+ end,
+ Alert = {WasAlertAdded, StillHasAlerts, Node},
case node() of
Node -> ok = alert_remote(Alert, Alertees, Source);
_ -> ok
@@ -202,20 +257,21 @@ internal_register(Pid, {M, F, A} = AlertMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
- {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
+ {ok, Sources} -> [apply(M, F, A ++ [Pid, R, {true, true, node()}]) || R <- Sources];
error -> ok
end,
NewAlertees = dict:store(Pid, AlertMFA, Alertees),
State#alarms{alertees = NewAlertees}.
-handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
+handle_set_resource_alarm(Source, Node, State) ->
rabbit_log:warning(
"~s resource limit alarm set on node ~p.~n~n"
"**********************************************************~n"
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, true, State)}.
+
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -227,13 +283,27 @@ handle_set_alarm(Alarm, State) ->
rabbit_log:warning("alarm '~p' set~n", [Alarm]),
{ok, State}.
-handle_clear_alarm({resource_limit, Source, Node}, State) ->
+handle_clear_resource_alarm(Source, Node, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, false, State)}.
+
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
handle_clear_alarm(Alarm, State) ->
rabbit_log:warning("alarm '~p' cleared~n", [Alarm]),
{ok, State}.
+
+is_node_alarmed(Source, Node, #alarms{alarmed_nodes = AN}) ->
+ case dict:find(Node, AN) of
+ {ok, Sources} ->
+ lists:member(Source, Sources);
+ error ->
+ false
+ end.
+
+get_alarms(#alarms{alarms = Alarms,
+ alarmed_nodes = AN}) ->
+ Alarms ++ [ {{resource_limit, Source, Node}, []}
+ || {Node, Sources} <- dict:to_list(AN), Source <- Sources ].
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 24b3ae9af4..31f1da2271 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -86,7 +86,9 @@ report_ram_duration(Pid, QueueDuration) ->
stop() ->
gen_server2:cast(?SERVER, stop).
-conserve_resources(Pid, disk, Conserve) ->
+%% Paging should be enabled/disabled only in response to disk resource alarms
+%% for current node.
+conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node ->
gen_server2:cast(Pid, {disk_alarm, Conserve});
conserve_resources(_Pid, _Source, _Conserve) ->
ok.