summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl26
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
-rw-r--r--src/rabbit_mirror_queue_sync.erl44
3 files changed, 44 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 439d1f4b1a..0820f3f9e4 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,24 +127,16 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
-sync_mirrors(State = #state{name = QName}) ->
- {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
- rabbit_amqqueue:lookup(QName),
- sync_mirrors(SPids -- SSPids, State).
-
-sync_mirrors([], State = #state{name = QName}) ->
- rabbit_log:info("Synchronising ~s: nothing to do~n",
- [rabbit_misc:rs(QName)]),
- {ok, State};
-sync_mirrors(SPids, State = #state { name = QName,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n",
- [rabbit_misc:rs(QName), SPids, BQ:len(BQS)]),
+sync_mirrors(State = #state { name = QName,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ rabbit_log:info("Synchronising ~s: ~p messages to synchronise~n",
+ [rabbit_misc:rs(QName), BQ:len(BQS)]),
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
Ref = make_ref(),
- Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, SPids),
- gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, SPids),
+ gm:broadcast(GM, {sync_start, Ref, Syncer}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(Syncer, Ref, QName, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 11490b9ca0..2b216a5f0f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -222,8 +222,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
-handle_cast({sync_start, Ref, MPid},
- State = #state { backing_queue = BQ,
+handle_cast({sync_start, Ref, Syncer},
+ State = #state { depth_delta = DD,
+ backing_queue = BQ,
backing_queue_state = BQS }) ->
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
S = fun({TRefN, BQSN}) -> State1#state{rate_timer_ref = TRefN,
@@ -232,7 +233,7 @@ handle_cast({sync_start, Ref, MPid},
%% [1] The master died so we do not need to set_delta even though
%% we purged since we will get a depth instruction soon.
case rabbit_mirror_queue_sync:slave(
- Ref, TRef, MPid, BQ, BQS,
+ DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
BQSN1 = update_ram_duration(BQN, BQSN),
TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
@@ -374,11 +375,8 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
-handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) ->
- case lists:member(SPid, SPids) of
- true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid});
- false -> ok
- end;
+handle_msg([SPid], _From, {sync_start, Ref, Syncer}) ->
+ gen_server2:cast(SPid, {sync_start, Ref, Syncer});
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index acf3e3e3c6..9ff853d59b 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/2, master_go/5, slave/6]).
+-export([master_prepare/3, master_go/5, slave/7]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -51,12 +51,12 @@
%% ---------------------------------------------------------------------------
%% Master
-master_prepare(Ref, SPids) ->
+master_prepare(Ref, QName, SPids) ->
MPid = self(),
- spawn_link(fun () -> syncer(Ref, MPid, SPids) end).
+ spawn_link(fun () -> syncer(Ref, QName, MPid, SPids) end).
-master_go(Syncer, Ref, Name, BQ, BQS) ->
- SendArgs = {Syncer, Ref, Name},
+master_go(Syncer, Ref, QName, BQ, BQS) ->
+ SendArgs = {Syncer, Ref, QName},
{Acc, BQS1} =
BQ:fold(fun (Msg, MsgProps, {I, Last}) ->
master_send(SendArgs, I, Last, Msg, MsgProps)
@@ -70,11 +70,11 @@ master_go(Syncer, Ref, Name, BQ, BQS) ->
_ -> {ok, BQS1}
end.
-master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) ->
+master_send({Syncer, Ref, QName}, I, Last, Msg, MsgProps) ->
Acc = {I + 1,
case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
true -> rabbit_log:info("Synchronising ~s: ~p messages~n",
- [rabbit_misc:rs(Name), I]),
+ [rabbit_misc:rs(QName), I]),
erlang:now();
false -> Last
end},
@@ -88,14 +88,23 @@ master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) ->
%% ---------------------------------------------------------------------------
%% Syncer
-syncer(Ref, MPid, SPids) ->
+syncer(Ref, QName, MPid, SPids) ->
SPidsMRefs = [{SPid, erlang:monitor(process, SPid)} || SPid <- SPids],
%% We wait for a reply from the slaves so that we know they are in
%% a receive block and will thus receive messages we send to them
%% *without* those messages ending up in their gen_server2 pqueue.
- SPidsMRefs1 = foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3),
- SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
- foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3),
+ case foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3) of
+ [] ->
+ rabbit_log:info("Synchronising ~s: all slaves already synced~n",
+ [rabbit_misc:rs(QName)]);
+ SPidsMRefs1 ->
+ rabbit_log:info("Synchronising ~s: ~p require sync~n",
+ [rabbit_misc:rs(QName),
+ [rabbit_misc:pid_to_string(S) ||
+ {S, _} <- SPidsMRefs1]]),
+ SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1),
+ foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3)
+ end,
unlink(MPid).
syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
@@ -121,12 +130,13 @@ wait_for_credit(SPidsMRefs, Ref) ->
foreach_slave(SPidsMRefs, Ref, Fun) ->
[{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
- Fun(SPid, MRef, Ref) =/= dead].
+ Fun(SPid, MRef, Ref) =/= ignore].
sync_receive_ready(SPid, MRef, Ref) ->
receive
{sync_ready, Ref, SPid} -> SPid;
- {'DOWN', MRef, _, SPid, _} -> dead
+ {sync_deny, Ref, SPid} -> ignore;
+ {'DOWN', MRef, _, SPid, _} -> ignore
end.
sync_receive_credit(SPid, MRef, _Ref) ->
@@ -134,7 +144,7 @@ sync_receive_credit(SPid, MRef, _Ref) ->
{bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
SPid;
{'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
- dead
+ ignore
end.
sync_send_complete(SPid, _MRef, Ref) ->
@@ -144,7 +154,11 @@ sync_send_complete(SPid, _MRef, Ref) ->
%% ---------------------------------------------------------------------------
%% Slave
-slave(Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
+slave(0, Ref, TRef, Syncer, _BQ, BQS, _UpdateRamDuration) ->
+ Syncer ! {sync_deny, Ref, self()},
+ {ok, {TRef, BQS}};
+
+slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, Syncer),
Syncer ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQS),