diff options
| author | Jean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr> | 2016-02-10 14:34:34 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr> | 2016-02-10 14:34:34 +0100 |
| commit | 61e6e6c33d447d2aa9e977d0244ad3a692670a8c (patch) | |
| tree | fe46c6e8513d41a3b6fabb31557d117962220c56 /src | |
| parent | 5d9d07476a85993b082bed651afdc6103be57af8 (diff) | |
| download | rabbitmq-server-git-61e6e6c33d447d2aa9e977d0244ad3a692670a8c.tar.gz | |
rabbit_mirror_queue_sync: Pause the syncer process if memory is low
If any node in the cluster is running out of memory, we pause the syncer
process. If we don't do this, this alarmed node could crash.
Fixes #616.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 59 |
1 files changed, 56 insertions, 3 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 2d8bdfa860..a97a9b50c8 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/4, master_go/8, slave/7]). +-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) -> [] -> Log("all slaves already synced", []); SPids1 -> MPid ! {ready, self()}, Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]), - syncer_loop(Ref, MPid, SPids1) + syncer_check_resources(Ref, MPid, SPids1) end. await_slaves(Ref, SPids) -> @@ -217,12 +217,43 @@ await_slaves(Ref, SPids) -> %% 'sync_start' and so will not reply. We need to act as though they are %% down. +syncer_check_resources(Ref, MPid, SPids) -> + rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + %% Before we ask the master node to send the first batch of messages + %% over here, we check if one node is already short on memory. If + %% that's the case, we wait for the alarm to be cleared before + %% starting the syncer loop. + AlarmedNodes = lists:any( + fun + ({{resource_limit, memory, _}, _}) -> true; + ({_, _}) -> false + end, rabbit_alarm:get_alarms()), + if + not AlarmedNodes -> + MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids); + true -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids1) + end + end. + syncer_loop(Ref, MPid, SPids) -> - MPid ! {next, Ref}, receive + {conserve_resources, memory, true} -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> syncer_loop(Ref, MPid, SPids1) + end; + {conserve_resources, _, _} -> + %% Ignore other alerts. + syncer_loop(Ref, MPid, SPids); {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), broadcast(SPids1, {sync_msgs, Ref, Msgs}), + MPid ! {next, Ref}, syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do @@ -239,6 +270,10 @@ broadcast(SPids, Msg) -> SPid ! Msg end || SPid <- SPids]. +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, + ok. + wait_for_credit(SPids) -> case credit_flow:blocked() of true -> receive @@ -252,6 +287,24 @@ wait_for_credit(SPids) -> false -> SPids end. +wait_for_resources(Ref, SPids) -> + receive + {conserve_resources, memory, false} -> + SPids; + {conserve_resources, _, _} -> + %% Ignore other alerts. + wait_for_resources(Ref, SPids); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + cancel; + {'DOWN', _, process, SPid, _} -> + credit_flow:peer_down(SPid), + SPids1 = wait_for_credit(lists:delete(SPid, SPids)), + wait_for_resources(Ref, SPids1) + end. + %% Syncer %% --------------------------------------------------------------------------- %% Slave |
