diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-11-28 13:02:50 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-11-28 13:02:50 +0000 |
| commit | 4eb5c2c0aac7910dc52dc23c3ebbc0ffcfaebfd6 (patch) | |
| tree | c812c8d87bd96ba062a35312fbb6b909a9ee8bbc | |
| parent | 06ad8fde700795a99363410a41ee1875ec491191 (diff) | |
| download | rabbitmq-server-git-4eb5c2c0aac7910dc52dc23c3ebbc0ffcfaebfd6.tar.gz | |
Spawn a separate process to send out messages from the master. This lets us simplify some monitor and credit_flow handling. Thus also stop sync_receive_credit from being a spinloop.
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 56 |
2 files changed, 34 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 545f2219c8..ff1eccaf19 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -145,8 +145,9 @@ sync_mirrors(SPids, State = #state { name = Name, Ref = make_ref(), %% We send the start over GM to flush out any other messages that %% we might have sent that way already. - gm:broadcast(GM, {sync_start, Ref, self(), SPids}), - BQS1 = rabbit_mirror_queue_sync:master(Name, Ref, SPids, BQ, BQS), + Syncer = rabbit_mirror_queue_sync:master_prepare(Name, Ref, SPids, BQ, BQS), + gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}), + BQS1 = rabbit_mirror_queue_sync:master_go(Syncer, Ref), rabbit_log:info("Synchronising ~s: complete~n", [rabbit_misc:rs(Name)]), State#state{backing_queue_state = BQS1}. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 36e9f1eb9f..978d940c5a 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,13 +18,26 @@ -include("rabbit.hrl"). --export([master/5, slave/6]). +-export([master_prepare/5, master_go/2, slave/6]). -define(SYNC_PROGRESS_INTERVAL, 1000000). %% --------------------------------------------------------------------------- -master(Name, Ref, SPids, BQ, BQS) -> +master_prepare(Name, Ref, SPids, BQ, BQS) -> + MPid = self(), + spawn_link(fun () -> master(Name, Ref, MPid, SPids, BQ, BQS) end). + +master_go(Syncer, Ref) -> + Syncer ! {go, Ref}, + receive + {done, Ref, BQS1} -> BQS1 + end. + +master(Name, Ref, MPid, SPids, BQ, BQS) -> + receive + {go, Ref} -> ok + end, SPidsMRefs = [begin MRef = erlang:monitor(process, SPid), {SPid, MRef} @@ -57,7 +70,8 @@ master(Name, Ref, SPids, BQ, BQS) -> end} end, {0, SPidsMRefs1, erlang:now()}, BQS), foreach_slave(SPidsMRefs2, Ref, fun sync_receive_complete/3), - BQS1. + MPid ! {done, Ref, BQS1}, + unlink(MPid). wait_for_credit(SPidsMRefs, Ref) -> case credit_flow:blocked() of @@ -76,36 +90,28 @@ sync_receive_ready(SPid, MRef, Ref) -> {'DOWN', MRef, _, SPid, _} -> dead end. -sync_receive_credit(SPid, MRef, Ref) -> +sync_receive_credit(SPid, MRef, _Ref) -> receive {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), - sync_receive_credit(SPid, MRef, Ref); + SPid; {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), dead - after 0 -> - SPid end. -sync_receive_complete(SPid, MRef, Ref) -> - SPid ! {sync_complete, Ref}, - receive - {sync_complete_ok, Ref, SPid} -> ok; - {'DOWN', MRef, _, SPid, _} -> ok - end, - erlang:demonitor(MRef, [flush]), - credit_flow:peer_down(SPid). +sync_receive_complete(SPid, _MRef, Ref) -> + SPid ! {sync_complete, Ref}. %% --------------------------------------------------------------------------- -slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) -> - MRef = erlang:monitor(process, MPid), - MPid ! {sync_ready, Ref, self()}, +slave(Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> + MRef = erlang:monitor(process, Syncer), + Syncer ! {sync_ready, Ref, self()}, {_MsgCount, BQS1} = BQ:purge(BQS), - slave_sync_loop({Ref, MRef, MPid, BQ, UpdateRamDuration}, TRef, BQS1). + slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration}, TRef, BQS1). -slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) -> +slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDur}, TRef, BQS) -> receive - {'DOWN', MRef, process, MPid, _Reason} -> + {'DOWN', MRef, process, Syncer, _Reason} -> %% If the master dies half way we are not in the usual %% half-synced state (with messages nearer the tail of the %% queue); instead we have ones nearer the head. If we then @@ -113,15 +119,15 @@ slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) -> %% messages from it, we have a hole in the middle. So the %% only thing to do here is purge. {_MsgCount, BQS1} = BQ:purge(BQS), - credit_flow:peer_down(MPid), + credit_flow:peer_down(Syncer), {failed, {TRef, BQS1}}; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), slave_sync_loop(Args, TRef, BQS); {sync_complete, Ref} -> - MPid ! {sync_complete_ok, Ref, self()}, + Syncer ! {sync_complete_ok, Ref, self()}, erlang:demonitor(MRef), - credit_flow:peer_down(MPid), + credit_flow:peer_down(Syncer), {ok, {TRef, BQS}}; {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -133,7 +139,7 @@ slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) -> {TRef2, BQS1} = UpdateRamDur(BQ, BQS), slave_sync_loop(Args, TRef2, BQS1); {sync_message, Ref, Msg, Props} -> - credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), + credit_flow:ack(Syncer, ?CREDIT_DISC_BOUND), Props1 = Props#message_properties{needs_confirming = false}, BQS1 = BQ:publish(Msg, Props1, true, none, BQS), slave_sync_loop(Args, TRef, BQS1); |
