summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_sync.erl88
2 files changed, 52 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ff1eccaf19..04e868bcd2 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,9 +145,9 @@ sync_mirrors(SPids, State = #state { name = Name,
Ref = make_ref(),
%% We send the start over GM to flush out any other messages that
%% we might have sent that way already.
- Syncer = rabbit_mirror_queue_sync:master_prepare(Name, Ref, SPids, BQ, BQS),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
- BQS1 = rabbit_mirror_queue_sync:master_go(Syncer, Ref),
+ BQS1 = rabbit_mirror_queue_sync:master_go(Syncer, Ref, Name, BQ, BQS),
rabbit_log:info("Synchronising ~s: complete~n",
[rabbit_misc:rs(Name)]),
State#state{backing_queue_state = BQS1}.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 978d940c5a..560e0d43c5 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,26 +18,45 @@
-include("rabbit.hrl").
--export([master_prepare/5, master_go/2, slave/6]).
+-export([master_prepare/2, master_go/5, slave/6]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
%% ---------------------------------------------------------------------------
+%% Master
-master_prepare(Name, Ref, SPids, BQ, BQS) ->
+master_prepare(Ref, SPids) ->
MPid = self(),
- spawn_link(fun () -> master(Name, Ref, MPid, SPids, BQ, BQS) end).
-
-master_go(Syncer, Ref) ->
- Syncer ! {go, Ref},
+ spawn_link(fun () -> syncer(Ref, MPid, SPids) end).
+
+master_go(Syncer, Ref, Name, BQ, BQS) ->
+ SendArgs = {Syncer, Ref, Name},
+ {_, BQS1} =
+ BQ:fold(fun (Msg, MsgProps, {I, Last}) ->
+ {I + 1, master_send(SendArgs, I, Last, Msg, MsgProps)}
+ end, {0, erlang:now()}, BQS),
+ Syncer ! {done, Ref},
+ BQS1.
+
+master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) ->
+ Syncer ! {msg, Ref, Msg, MsgProps},
receive
- {done, Ref, BQS1} -> BQS1
+ {msg_ok, Ref} -> ok;
+ {'EXIT', _Pid, Reason} -> throw({time_to_shutdown, Reason})
+ end,
+ case timer:now_diff(erlang:now(), Last) >
+ ?SYNC_PROGRESS_INTERVAL of
+ true -> rabbit_log:info("Synchronising ~s: ~p messages~n",
+ [rabbit_misc:rs(Name), I]),
+ erlang:now();
+ false -> Last
end.
-master(Name, Ref, MPid, SPids, BQ, BQS) ->
- receive
- {go, Ref} -> ok
- end,
+%% Master
+%% ---------------------------------------------------------------------------
+%% Syncer
+
+syncer(Ref, MPid, SPids) ->
SPidsMRefs = [begin
MRef = erlang:monitor(process, SPid),
{SPid, MRef}
@@ -46,33 +65,24 @@ master(Name, Ref, MPid, SPids, BQ, BQS) ->
%% a receive block and will thus receive messages we send to them
%% *without* those messages ending up in their gen_server2 pqueue.
SPidsMRefs1 = foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3),
- {{_, SPidsMRefs2, _}, BQS1} =
- BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) ->
- receive
- {'EXIT', _Pid, Reason} ->
- throw({time_to_shutdown, Reason})
- after 0 ->
- ok
- end,
- SPMR1 = wait_for_credit(SPMR, Ref),
- [begin
- credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
- SPid ! {sync_message, Ref, Msg, MsgProps}
- end || {SPid, _} <- SPMR1],
- {I + 1, SPMR1,
- case timer:now_diff(erlang:now(), Last) >
- ?SYNC_PROGRESS_INTERVAL of
- true -> rabbit_log:info(
- "Synchronising ~s: ~p messages~n",
- [rabbit_misc:rs(Name), I]),
- erlang:now();
- false -> Last
- end}
- end, {0, SPidsMRefs1, erlang:now()}, BQS),
- foreach_slave(SPidsMRefs2, Ref, fun sync_receive_complete/3),
- MPid ! {done, Ref, BQS1},
+ SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
+ foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3),
unlink(MPid).
+syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
+ receive
+ {msg, Ref, Msg, MsgProps} ->
+ MPid ! {msg_ok, Ref},
+ SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref),
+ [begin
+ credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
+ SPid ! {sync_msg, Ref, Msg, MsgProps}
+ end || {SPid, _} <- SPidsMRefs1],
+ syncer_loop(Args, SPidsMRefs1);
+ {done, Ref} ->
+ SPidsMRefs
+ end.
+
wait_for_credit(SPidsMRefs, Ref) ->
case credit_flow:blocked() of
true -> wait_for_credit(foreach_slave(SPidsMRefs, Ref,
@@ -98,10 +108,12 @@ sync_receive_credit(SPid, MRef, _Ref) ->
dead
end.
-sync_receive_complete(SPid, _MRef, Ref) ->
+sync_send_complete(SPid, _MRef, Ref) ->
SPid ! {sync_complete, Ref}.
+%% Syncer
%% ---------------------------------------------------------------------------
+%% Slave
slave(Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, Syncer),
@@ -138,7 +150,7 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDur}, TRef, BQS) ->
update_ram_duration ->
{TRef2, BQS1} = UpdateRamDur(BQ, BQS),
slave_sync_loop(Args, TRef2, BQS1);
- {sync_message, Ref, Msg, Props} ->
+ {sync_msg, Ref, Msg, Props} ->
credit_flow:ack(Syncer, ?CREDIT_DISC_BOUND),
Props1 = Props#message_properties{needs_confirming = false},
BQS1 = BQ:publish(Msg, Props1, true, none, BQS),