diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-28 13:41:34 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-28 13:41:34 +0000 |
| commit | df6b71313d1cf383a4dca02a619077cbb28e11e4 (patch) | |
| tree | 574d15bb71b36e035412b8d8f71219e17f4fc405 | |
| parent | 4eb5c2c0aac7910dc52dc23c3ebbc0ffcfaebfd6 (diff) | |
| download | rabbitmq-server-git-df6b71313d1cf383a4dca02a619077cbb28e11e4.tar.gz | |
Don't expose the BQ to the syncer.
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 88 |
2 files changed, 52 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ff1eccaf19..04e868bcd2 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,9 +145,9 @@ sync_mirrors(SPids, State = #state { name = Name, Ref = make_ref(), %% We send the start over GM to flush out any other messages that %% we might have sent that way already. - Syncer = rabbit_mirror_queue_sync:master_prepare(Name, Ref, SPids, BQ, BQS), + Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, SPids), gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), - BQS1 = rabbit_mirror_queue_sync:master_go(Syncer, Ref), + BQS1 = rabbit_mirror_queue_sync:master_go(Syncer, Ref, Name, BQ, BQS), rabbit_log:info("Synchronising ~s: complete~n", [rabbit_misc:rs(Name)]), State#state{backing_queue_state = BQS1}. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 978d940c5a..560e0d43c5 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,26 +18,45 @@ -include("rabbit.hrl"). --export([master_prepare/5, master_go/2, slave/6]). +-export([master_prepare/2, master_go/5, slave/6]). -define(SYNC_PROGRESS_INTERVAL, 1000000). %% --------------------------------------------------------------------------- +%% Master -master_prepare(Name, Ref, SPids, BQ, BQS) -> +master_prepare(Ref, SPids) -> MPid = self(), - spawn_link(fun () -> master(Name, Ref, MPid, SPids, BQ, BQS) end). - -master_go(Syncer, Ref) -> - Syncer ! {go, Ref}, + spawn_link(fun () -> syncer(Ref, MPid, SPids) end). + +master_go(Syncer, Ref, Name, BQ, BQS) -> + SendArgs = {Syncer, Ref, Name}, + {_, BQS1} = + BQ:fold(fun (Msg, MsgProps, {I, Last}) -> + {I + 1, master_send(SendArgs, I, Last, Msg, MsgProps)} + end, {0, erlang:now()}, BQS), + Syncer ! {done, Ref}, + BQS1. + +master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) -> + Syncer ! {msg, Ref, Msg, MsgProps}, receive - {done, Ref, BQS1} -> BQS1 + {msg_ok, Ref} -> ok; + {'EXIT', _Pid, Reason} -> throw({time_to_shutdown, Reason}) + end, + case timer:now_diff(erlang:now(), Last) > + ?SYNC_PROGRESS_INTERVAL of + true -> rabbit_log:info("Synchronising ~s: ~p messages~n", + [rabbit_misc:rs(Name), I]), + erlang:now(); + false -> Last end. -master(Name, Ref, MPid, SPids, BQ, BQS) -> - receive - {go, Ref} -> ok - end, +%% Master +%% --------------------------------------------------------------------------- +%% Syncer + +syncer(Ref, MPid, SPids) -> SPidsMRefs = [begin MRef = erlang:monitor(process, SPid), {SPid, MRef} @@ -46,33 +65,24 @@ master(Name, Ref, MPid, SPids, BQ, BQS) -> %% a receive block and will thus receive messages we send to them %% *without* those messages ending up in their gen_server2 pqueue. SPidsMRefs1 = foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3), - {{_, SPidsMRefs2, _}, BQS1} = - BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) -> - receive - {'EXIT', _Pid, Reason} -> - throw({time_to_shutdown, Reason}) - after 0 -> - ok - end, - SPMR1 = wait_for_credit(SPMR, Ref), - [begin - credit_flow:send(SPid, ?CREDIT_DISC_BOUND), - SPid ! {sync_message, Ref, Msg, MsgProps} - end || {SPid, _} <- SPMR1], - {I + 1, SPMR1, - case timer:now_diff(erlang:now(), Last) > - ?SYNC_PROGRESS_INTERVAL of - true -> rabbit_log:info( - "Synchronising ~s: ~p messages~n", - [rabbit_misc:rs(Name), I]), - erlang:now(); - false -> Last - end} - end, {0, SPidsMRefs1, erlang:now()}, BQS), - foreach_slave(SPidsMRefs2, Ref, fun sync_receive_complete/3), - MPid ! {done, Ref, BQS1}, + SPidsMRefs2 = syncer_loop({Ref, MPid}, SPidsMRefs1), + foreach_slave(SPidsMRefs2, Ref, fun sync_send_complete/3), unlink(MPid). +syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> + receive + {msg, Ref, Msg, MsgProps} -> + MPid ! {msg_ok, Ref}, + SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref), + [begin + credit_flow:send(SPid, ?CREDIT_DISC_BOUND), + SPid ! {sync_msg, Ref, Msg, MsgProps} + end || {SPid, _} <- SPidsMRefs1], + syncer_loop(Args, SPidsMRefs1); + {done, Ref} -> + SPidsMRefs + end. + wait_for_credit(SPidsMRefs, Ref) -> case credit_flow:blocked() of true -> wait_for_credit(foreach_slave(SPidsMRefs, Ref, @@ -98,10 +108,12 @@ sync_receive_credit(SPid, MRef, _Ref) -> dead end. -sync_receive_complete(SPid, _MRef, Ref) -> +sync_send_complete(SPid, _MRef, Ref) -> SPid ! {sync_complete, Ref}. +%% Syncer %% --------------------------------------------------------------------------- +%% Slave slave(Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, Syncer), @@ -138,7 +150,7 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDur}, TRef, BQS) -> update_ram_duration -> {TRef2, BQS1} = UpdateRamDur(BQ, BQS), slave_sync_loop(Args, TRef2, BQS1); - {sync_message, Ref, Msg, Props} -> + {sync_msg, Ref, Msg, Props} -> credit_flow:ack(Syncer, ?CREDIT_DISC_BOUND), Props1 = Props#message_properties{needs_confirming = false}, BQS1 = BQ:publish(Msg, Props1, true, none, BQS), |
