summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl5
-rw-r--r--src/rabbit_mirror_queue_sync.erl56
2 files changed, 34 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 545f2219c8..ff1eccaf19 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,8 +145,9 @@ sync_mirrors(SPids, State = #state { name = Name,
Ref = make_ref(),
%% We send the start over GM to flush out any other messages that
%% we might have sent that way already.
- gm:broadcast(GM, {sync_start, Ref, self(), SPids}),
- BQS1 = rabbit_mirror_queue_sync:master(Name, Ref, SPids, BQ, BQS),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Name, Ref, SPids, BQ, BQS),
+ gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
+ BQS1 = rabbit_mirror_queue_sync:master_go(Syncer, Ref),
rabbit_log:info("Synchronising ~s: complete~n",
[rabbit_misc:rs(Name)]),
State#state{backing_queue_state = BQS1}.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 36e9f1eb9f..978d940c5a 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,13 +18,26 @@
-include("rabbit.hrl").
--export([master/5, slave/6]).
+-export([master_prepare/5, master_go/2, slave/6]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
%% ---------------------------------------------------------------------------
-master(Name, Ref, SPids, BQ, BQS) ->
+master_prepare(Name, Ref, SPids, BQ, BQS) ->
+ MPid = self(),
+ spawn_link(fun () -> master(Name, Ref, MPid, SPids, BQ, BQS) end).
+
+master_go(Syncer, Ref) ->
+ Syncer ! {go, Ref},
+ receive
+ {done, Ref, BQS1} -> BQS1
+ end.
+
+master(Name, Ref, MPid, SPids, BQ, BQS) ->
+ receive
+ {go, Ref} -> ok
+ end,
SPidsMRefs = [begin
MRef = erlang:monitor(process, SPid),
{SPid, MRef}
@@ -57,7 +70,8 @@ master(Name, Ref, SPids, BQ, BQS) ->
end}
end, {0, SPidsMRefs1, erlang:now()}, BQS),
foreach_slave(SPidsMRefs2, Ref, fun sync_receive_complete/3),
- BQS1.
+ MPid ! {done, Ref, BQS1},
+ unlink(MPid).
wait_for_credit(SPidsMRefs, Ref) ->
case credit_flow:blocked() of
@@ -76,36 +90,28 @@ sync_receive_ready(SPid, MRef, Ref) ->
{'DOWN', MRef, _, SPid, _} -> dead
end.
-sync_receive_credit(SPid, MRef, Ref) ->
+sync_receive_credit(SPid, MRef, _Ref) ->
receive
{bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
- sync_receive_credit(SPid, MRef, Ref);
+ SPid;
{'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
dead
- after 0 ->
- SPid
end.
-sync_receive_complete(SPid, MRef, Ref) ->
- SPid ! {sync_complete, Ref},
- receive
- {sync_complete_ok, Ref, SPid} -> ok;
- {'DOWN', MRef, _, SPid, _} -> ok
- end,
- erlang:demonitor(MRef, [flush]),
- credit_flow:peer_down(SPid).
+sync_receive_complete(SPid, _MRef, Ref) ->
+ SPid ! {sync_complete, Ref}.
%% ---------------------------------------------------------------------------
-slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) ->
- MRef = erlang:monitor(process, MPid),
- MPid ! {sync_ready, Ref, self()},
+slave(Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
+ MRef = erlang:monitor(process, Syncer),
+ Syncer ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQS),
- slave_sync_loop({Ref, MRef, MPid, BQ, UpdateRamDuration}, TRef, BQS1).
+ slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS1).
-slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) ->
+slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDur}, TRef, BQS) ->
receive
- {'DOWN', MRef, process, MPid, _Reason} ->
+ {'DOWN', MRef, process, Syncer, _Reason} ->
%% If the master dies half way we are not in the usual
%% half-synced state (with messages nearer the tail of the
%% queue); instead we have ones nearer the head. If we then
@@ -113,15 +119,15 @@ slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) ->
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.
{_MsgCount, BQS1} = BQ:purge(BQS),
- credit_flow:peer_down(MPid),
+ credit_flow:peer_down(Syncer),
{failed, {TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
slave_sync_loop(Args, TRef, BQS);
{sync_complete, Ref} ->
- MPid ! {sync_complete_ok, Ref, self()},
+ Syncer ! {sync_complete_ok, Ref, self()},
erlang:demonitor(MRef),
- credit_flow:peer_down(MPid),
+ credit_flow:peer_down(Syncer),
{ok, {TRef, BQS}};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -133,7 +139,7 @@ slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) ->
{TRef2, BQS1} = UpdateRamDur(BQ, BQS),
slave_sync_loop(Args, TRef2, BQS1);
{sync_message, Ref, Msg, Props} ->
- credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
+ credit_flow:ack(Syncer, ?CREDIT_DISC_BOUND),
Props1 = Props#message_properties{needs_confirming = false},
BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
slave_sync_loop(Args, TRef, BQS1);