summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr>2016-02-10 14:34:34 +0100
committerJean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr>2016-02-10 14:34:34 +0100
commit61e6e6c33d447d2aa9e977d0244ad3a692670a8c (patch)
treefe46c6e8513d41a3b6fabb31557d117962220c56 /src
parent5d9d07476a85993b082bed651afdc6103be57af8 (diff)
downloadrabbitmq-server-git-61e6e6c33d447d2aa9e977d0244ad3a692670a8c.tar.gz
rabbit_mirror_queue_sync: Pause the syncer process if memory is low
If any node in the cluster is running out of memory, we pause the syncer process. If we don't do this, this alarmed node could crash. Fixes #616.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_sync.erl59
1 files changed, 56 insertions, 3 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 2d8bdfa860..a97a9b50c8 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/4, master_go/8, slave/7]).
+-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) ->
[] -> Log("all slaves already synced", []);
SPids1 -> MPid ! {ready, self()},
Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
- syncer_loop(Ref, MPid, SPids1)
+ syncer_check_resources(Ref, MPid, SPids1)
end.
await_slaves(Ref, SPids) ->
@@ -217,12 +217,43 @@ await_slaves(Ref, SPids) ->
%% 'sync_start' and so will not reply. We need to act as though they are
%% down.
+syncer_check_resources(Ref, MPid, SPids) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ %% Before we ask the master node to send the first batch of messages
+ %% over here, we check if one node is already short on memory. If
+ %% that's the case, we wait for the alarm to be cleared before
+ %% starting the syncer loop.
+ AlarmedNodes = lists:any(
+ fun
+ ({{resource_limit, memory, _}, _}) -> true;
+ ({_, _}) -> false
+ end, rabbit_alarm:get_alarms()),
+ if
+ not AlarmedNodes ->
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids);
+ true ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end
+ end.
+
syncer_loop(Ref, MPid, SPids) ->
- MPid ! {next, Ref},
receive
+ {conserve_resources, memory, true} ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> syncer_loop(Ref, MPid, SPids1)
+ end;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
@@ -239,6 +270,10 @@ broadcast(SPids, Msg) ->
SPid ! Msg
end || SPid <- SPids].
+conserve_resources(Pid, Source, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Source, Conserve},
+ ok.
+
wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
@@ -252,6 +287,24 @@ wait_for_credit(SPids) ->
false -> SPids
end.
+wait_for_resources(Ref, SPids) ->
+ receive
+ {conserve_resources, memory, false} ->
+ SPids;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ wait_for_resources(Ref, SPids);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ cancel;
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ SPids1 = wait_for_credit(lists:delete(SPid, SPids)),
+ wait_for_resources(Ref, SPids1)
+ end.
+
%% Syncer
%% ---------------------------------------------------------------------------
%% Slave