summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-02-11 00:29:40 +0300
committerMichael Klishin <michael@novemberain.com>2016-02-11 00:29:40 +0300
commitdbf9923039f6de4d18a3af0934146408eb9f455b (patch)
treefe46c6e8513d41a3b6fabb31557d117962220c56 /src
parent5d9d07476a85993b082bed651afdc6103be57af8 (diff)
parent61e6e6c33d447d2aa9e977d0244ad3a692670a8c (diff)
downloadrabbitmq-server-git-dbf9923039f6de4d18a3af0934146408eb9f455b.tar.gz
Merge pull request #617 from rabbitmq/rabbitmq-server-616
rabbit_mirror_queue_sync: Pause the syncer process if memory is low
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_sync.erl59
1 files changed, 56 insertions, 3 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 2d8bdfa860..a97a9b50c8 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/4, master_go/8, slave/7]).
+-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) ->
[] -> Log("all slaves already synced", []);
SPids1 -> MPid ! {ready, self()},
Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
- syncer_loop(Ref, MPid, SPids1)
+ syncer_check_resources(Ref, MPid, SPids1)
end.
await_slaves(Ref, SPids) ->
@@ -217,12 +217,43 @@ await_slaves(Ref, SPids) ->
%% 'sync_start' and so will not reply. We need to act as though they are
%% down.
+syncer_check_resources(Ref, MPid, SPids) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ %% Before we ask the master node to send the first batch of messages
+ %% over here, we check if one node is already short on memory. If
+ %% that's the case, we wait for the alarm to be cleared before
+ %% starting the syncer loop.
+ AlarmedNodes = lists:any(
+ fun
+ ({{resource_limit, memory, _}, _}) -> true;
+ ({_, _}) -> false
+ end, rabbit_alarm:get_alarms()),
+ if
+ not AlarmedNodes ->
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids);
+ true ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end
+ end.
+
syncer_loop(Ref, MPid, SPids) ->
- MPid ! {next, Ref},
receive
+ {conserve_resources, memory, true} ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> syncer_loop(Ref, MPid, SPids1)
+ end;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
@@ -239,6 +270,10 @@ broadcast(SPids, Msg) ->
SPid ! Msg
end || SPid <- SPids].
+conserve_resources(Pid, Source, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Source, Conserve},
+ ok.
+
wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
@@ -252,6 +287,24 @@ wait_for_credit(SPids) ->
false -> SPids
end.
+wait_for_resources(Ref, SPids) ->
+ receive
+ {conserve_resources, memory, false} ->
+ SPids;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ wait_for_resources(Ref, SPids);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ cancel;
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ SPids1 = wait_for_credit(lists:delete(SPid, SPids)),
+ wait_for_resources(Ref, SPids1)
+ end.
+
%% Syncer
%% ---------------------------------------------------------------------------
%% Slave