diff options
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 44 |
3 files changed, 44 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 439d1f4b1a..0820f3f9e4 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -127,24 +127,16 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. -sync_mirrors(State = #state{name = QName}) -> - {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = - rabbit_amqqueue:lookup(QName), - sync_mirrors(SPids -- SSPids, State). - -sync_mirrors([], State = #state{name = QName}) -> - rabbit_log:info("Synchronising ~s: nothing to do~n", - [rabbit_misc:rs(QName)]), - {ok, State}; -sync_mirrors(SPids, State = #state { name = QName, - gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n", - [rabbit_misc:rs(QName), SPids, BQ:len(BQS)]), +sync_mirrors(State = #state { name = QName, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + rabbit_log:info("Synchronising ~s: ~p messages to synchronise~n", + [rabbit_misc:rs(QName), BQ:len(BQS)]), + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), Ref = make_ref(), - Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, SPids), - gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, SPids), + gm:broadcast(GM, {sync_start, Ref, Syncer}), S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, case rabbit_mirror_queue_sync:master_go(Syncer, Ref, QName, BQ, BQS) of {shutdown, R, BQS1} -> {stop, R, S(BQS1)}; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 11490b9ca0..2b216a5f0f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -222,8 +222,9 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, end, noreply(maybe_enqueue_message(Delivery, State)); -handle_cast({sync_start, Ref, MPid}, - State = #state { backing_queue = BQ, +handle_cast({sync_start, Ref, Syncer}, + State = #state { depth_delta = DD, + backing_queue = BQ, backing_queue_state = BQS }) -> State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), S = fun({TRefN, BQSN}) -> State1#state{rate_timer_ref = TRefN, @@ -232,7 +233,7 @@ handle_cast({sync_start, Ref, MPid}, %% [1] The master died so we do not need to set_delta even though %% we purged since we will get a depth instruction soon. case rabbit_mirror_queue_sync:slave( - Ref, TRef, MPid, BQ, BQS, + DD, Ref, TRef, Syncer, BQ, BQS, fun (BQN, BQSN) -> BQSN1 = update_ram_duration(BQN, BQSN), TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, @@ -374,11 +375,8 @@ handle_msg([_SPid], _From, process_death) -> handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; -handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) -> - case lists:member(SPid, SPids) of - true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid}); - false -> ok - end; +handle_msg([SPid], _From, {sync_start, Ref, Syncer}) -> + gen_server2:cast(SPid, {sync_start, Ref, Syncer}); handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index acf3e3e3c6..9ff853d59b 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/2, master_go/5, slave/6]). +-export([master_prepare/3, master_go/5, slave/7]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -51,12 +51,12 @@ %% --------------------------------------------------------------------------- %% Master -master_prepare(Ref, SPids) -> +master_prepare(Ref, QName, SPids) -> MPid = self(), - spawn_link(fun () -> syncer(Ref, MPid, SPids) end). + spawn_link(fun () -> syncer(Ref, QName, MPid, SPids) end). -master_go(Syncer, Ref, Name, BQ, BQS) -> - SendArgs = {Syncer, Ref, Name}, +master_go(Syncer, Ref, QName, BQ, BQS) -> + SendArgs = {Syncer, Ref, QName}, {Acc, BQS1} = BQ:fold(fun (Msg, MsgProps, {I, Last}) -> master_send(SendArgs, I, Last, Msg, MsgProps) @@ -70,11 +70,11 @@ master_go(Syncer, Ref, Name, BQ, BQS) -> _ -> {ok, BQS1} end. -master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) -> +master_send({Syncer, Ref, QName}, I, Last, Msg, MsgProps) -> Acc = {I + 1, case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of true -> rabbit_log:info("Synchronising ~s: ~p messages~n", - [rabbit_misc:rs(Name), I]), + [rabbit_misc:rs(QName), I]), erlang:now(); false -> Last end}, @@ -88,14 +88,23 @@ master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) -> %% --------------------------------------------------------------------------- %% Syncer -syncer(Ref, MPid, SPids) -> +syncer(Ref, QName, MPid, SPids) -> SPidsMRefs = [{SPid, erlang:monitor(process, SPid)} || SPid <- SPids], %% We wait for a reply from the slaves so that we know they are in %% a receive block and will thus receive messages we send to them %% *without* those messages ending up in their gen_server2 pqueue. - SPidsMRefs1 = foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3), - SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1), - foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3), + case foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3) of + [] -> + rabbit_log:info("Synchronising ~s: all slaves already synced~n", + [rabbit_misc:rs(QName)]); + SPidsMRefs1 -> + rabbit_log:info("Synchronising ~s: ~p require sync~n", + [rabbit_misc:rs(QName), + [rabbit_misc:pid_to_string(S) || + {S, _} <- SPidsMRefs1]]), + SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1), + foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3) + end, unlink(MPid). syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> @@ -121,12 +130,13 @@ wait_for_credit(SPidsMRefs, Ref) -> foreach_slave(SPidsMRefs, Ref, Fun) -> [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, - Fun(SPid, MRef, Ref) =/= dead]. + Fun(SPid, MRef, Ref) =/= ignore]. sync_receive_ready(SPid, MRef, Ref) -> receive {sync_ready, Ref, SPid} -> SPid; - {'DOWN', MRef, _, SPid, _} -> dead + {sync_deny, Ref, SPid} -> ignore; + {'DOWN', MRef, _, SPid, _} -> ignore end. sync_receive_credit(SPid, MRef, _Ref) -> @@ -134,7 +144,7 @@ sync_receive_credit(SPid, MRef, _Ref) -> {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), SPid; {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), - dead + ignore end. sync_send_complete(SPid, _MRef, Ref) -> @@ -144,7 +154,11 @@ sync_send_complete(SPid, _MRef, Ref) -> %% --------------------------------------------------------------------------- %% Slave -slave(Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> +slave(0, Ref, TRef, Syncer, _BQ, BQS, _UpdateRamDuration) -> + Syncer ! {sync_deny, Ref, self()}, + {ok, {TRef, BQS}}; + +slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, Syncer), Syncer ! {sync_ready, Ref, self()}, {_MsgCount, BQS1} = BQ:purge(BQS), |
