diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 14:13:54 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-26 14:13:54 +0000 |
| commit | ef23ef410fa80c73cb84d43bde8cd80a994e6fcd (patch) | |
| tree | f4e06fa3b1a48a399d43cbce538ec430395bbdac | |
| parent | e3d8fac887180e047fffcdb2d1524b2a65eb8c1f (diff) | |
| download | rabbitmq-server-git-ef23ef410fa80c73cb84d43bde8cd80a994e6fcd.tar.gz | |
Send the sync_start over GM to flush out any other messages that we might have sent that way already.
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 23 |
2 files changed, 19 insertions, 11 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 542d724af1..9527ff30b9 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -131,13 +131,16 @@ sync_mirrors([], Name, State) -> rabbit_log:info("Synchronising ~s: nothing to do~n", [rabbit_misc:rs(Name)]), State; -sync_mirrors(SPids, Name, State = #state { backing_queue = BQ, +sync_mirrors(SPids, Name, State = #state { gm = GM, + backing_queue = BQ, backing_queue_state = BQS }) -> rabbit_log:info("Synchronising ~s with slaves ~p~n", [rabbit_misc:rs(Name), SPids]), Ref = make_ref(), + %% We send the start over GM to flush out any other messages that + %% we might have sent that way already. + gm:broadcast(GM, {sync_start, Ref, self(), SPids}), SPidsMRefs = [begin - SPid ! {sync_start, Ref, self()}, MRef = erlang:monitor(process, SPid), {SPid, MRef} end || SPid <- SPids], diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 311c6ca658..e7f26e6b0d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -222,6 +222,15 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, end, noreply(maybe_enqueue_message(Delivery, State)); +handle_cast({sync_start, Ref, MPid}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + MRef = erlang:monitor(process, MPid), + MPid ! {sync_ready, Ref, self()}, + {_MsgCount, BQS1} = BQ:purge(BQS), + noreply( + sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})); + handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); @@ -264,15 +273,6 @@ handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), noreply(State); -handle_info({sync_start, Ref, MPid}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - MRef = erlang:monitor(process, MPid), - MPid ! {sync_ready, Ref, self()}, - {_MsgCount, BQS1} = BQ:purge(BQS), - noreply( - sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})); - handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -367,6 +367,11 @@ handle_msg([_SPid], _From, process_death) -> handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; +handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) -> + case lists:member(SPid, SPids) of + true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid}); + false -> ok + end; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). |
