summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2013-06-24 17:43:46 +0400
committerMichael Klishin <michael@rabbitmq.com>2013-06-24 17:43:46 +0400
commite9ba72ed59ecada718fc415ba47879ac22982dba (patch)
treefc3b96fd9bb5161a3322a06d94ef482eb36dac6d
parent19ca23aafc70361ffa0137966baa98793661c45c (diff)
downloadrabbitmq-server-git-e9ba72ed59ecada718fc415ba47879ac22982dba.tar.gz
Deliver all alart notification to handle overlapping alarms
connection.blocked requires us to track resources we are conserving. This means the old logic of determining edge state transitions for alarms does not work any more. Instead of using the old strategy of comparing alarmed node collection sizes, instead of pass around what event the notification is for and simply deliver it to the relevant nodes. This requires that rabbit_alarm event consumers handle duplicate notifications. They already do so after earlier changes on branch bug25191. This makes connection unblocking work correctly in the following sequence of events: * memory alarm set for node A * disk alarm set for node A * memory alarm cleared for node A * disk alarm cleared for node A as well as other similar scenarios with overlapping alarms. This slighly increases internode and intranode message traffic of alarm notifications. Since alarms occur rarely in well-monitored systems, this is a reasonable trade-off.
-rw-r--r--src/rabbit_alarm.erl34
-rw-r--r--src/rabbit_reader.erl5
2 files changed, 15 insertions, 24 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 16b69af987..40be195198 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -133,7 +133,7 @@ handle_event({node_up, Node}, State) ->
{ok, State};
handle_event({node_down, Node}, State) ->
- {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
+ {ok, maybe_alert(fun dict_unappend_all/3, Node, [], clear, State)};
handle_event({register, Pid, AlertMFA}, State) ->
{ok, internal_register(Pid, AlertMFA, State)};
@@ -177,35 +177,25 @@ dict_unappend(Key, Val, Dict) ->
X -> dict:store(Key, X, Dict)
end.
-count_dict_values(Val, Dict) ->
- dict:fold(fun (_Node, List, Count) ->
- Count + case lists:member(Val, List) of
- true -> 1;
- false -> 0
- end
- end, 0, Dict).
-
-maybe_alert(UpdateFun, Node, Source,
+maybe_alert(UpdateFun, Node, Source, Event,
State = #alarms{alarmed_nodes = AN,
alertees = Alertees}) ->
AN1 = UpdateFun(Node, Source, AN),
- BeforeSz = count_dict_values(Source, AN),
- AfterSz = count_dict_values(Source, AN1),
%% If we have changed our alarm state, inform the remotes.
IsLocal = Node =:= node(),
- if IsLocal andalso BeforeSz < AfterSz ->
+ if IsLocal andalso Event =:= set ->
ok = alert_remote(true, Alertees, Source);
- IsLocal andalso BeforeSz > AfterSz ->
+ IsLocal andalso Event =:= clear ->
ok = alert_remote(false, Alertees, Source);
- true ->
+ true ->
ok
end,
- %% If the overall alarm state has changed, inform the locals.
- case {dict:size(AN), dict:size(AN1)} of
- {0, 1} -> ok = alert_local(true, Alertees, Source);
- {1, 0} -> ok = alert_local(false, Alertees, Source);
- {_, _} -> ok
+ case Event of
+ clear ->
+ ok = alert_local(false, Alertees, Source);
+ set ->
+ ok = alert_local(true, Alertees, Source)
end,
State#alarms{alarmed_nodes = AN1}.
@@ -241,7 +231,7 @@ handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
"*** Publishers will be blocked until this alarm clears ***~n"
"**********************************************************~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_append/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_append/3, Node, Source, set, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
rabbit_log:warning(
"file descriptor limit alarm set.~n~n"
@@ -256,7 +246,7 @@ handle_set_alarm(Alarm, State) ->
handle_clear_alarm({resource_limit, Source, Node}, State) ->
rabbit_log:warning("~s resource limit alarm cleared on node ~p~n",
[Source, Node]),
- {ok, maybe_alert(fun dict_unappend/3, Node, Source, State)};
+ {ok, maybe_alert(fun dict_unappend/3, Node, Source, clear, State)};
handle_clear_alarm(file_descriptor_limit, State) ->
rabbit_log:warning("file descriptor limit alarm cleared~n"),
{ok, State};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a272706707..31403ab820 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -416,8 +416,9 @@ terminate(_Explanation, State) ->
{force, State}.
control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
- case {CS, ((Throttle#throttle.conserve_resources =/= []) orelse
- credit_flow:blocked())} of
+ IsThrottled = ((Throttle#throttle.conserve_resources =/= []) orelse
+ credit_flow:blocked()),
+ case {CS, IsThrottled} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(