summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl7
-rw-r--r--src/rabbit_mirror_queue_slave.erl23
2 files changed, 19 insertions, 11 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 542d724af1..9527ff30b9 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -131,13 +131,16 @@ sync_mirrors([], Name, State) ->
rabbit_log:info("Synchronising ~s: nothing to do~n",
[rabbit_misc:rs(Name)]),
State;
-sync_mirrors(SPids, Name, State = #state { backing_queue = BQ,
+sync_mirrors(SPids, Name, State = #state { gm = GM,
+ backing_queue = BQ,
backing_queue_state = BQS }) ->
rabbit_log:info("Synchronising ~s with slaves ~p~n",
[rabbit_misc:rs(Name), SPids]),
Ref = make_ref(),
+ %% We send the start over GM to flush out any other messages that
+ %% we might have sent that way already.
+ gm:broadcast(GM, {sync_start, Ref, self(), SPids}),
SPidsMRefs = [begin
- SPid ! {sync_start, Ref, self()},
MRef = erlang:monitor(process, SPid),
{SPid, MRef}
end || SPid <- SPids],
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 311c6ca658..e7f26e6b0d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -222,6 +222,15 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
+handle_cast({sync_start, Ref, MPid},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ MRef = erlang:monitor(process, MPid),
+ MPid ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ noreply(
+ sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}));
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
@@ -264,15 +273,6 @@ handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
noreply(State);
-handle_info({sync_start, Ref, MPid},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- MRef = erlang:monitor(process, MPid),
- MPid ! {sync_ready, Ref, self()},
- {_MsgCount, BQS1} = BQ:purge(BQS),
- noreply(
- sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}));
-
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -367,6 +367,11 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
+handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) ->
+ case lists:member(SPid, SPids) of
+ true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid});
+ false -> ok
+ end;
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).