diff options
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 144 |
3 files changed, 160 insertions, 120 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a695d6f20d..cba3def7f3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -45,8 +45,6 @@ known_senders }). --define(SYNC_PROGRESS_INTERVAL, 1000000). - -ifdef(use_specs). -export_type([death_fun/0, depth_fun/0]). @@ -142,80 +140,11 @@ sync_mirrors(SPids, Name, State = #state { gm = GM, %% We send the start over GM to flush out any other messages that %% we might have sent that way already. gm:broadcast(GM, {sync_start, Ref, self(), SPids}), - SPidsMRefs = [begin - MRef = erlang:monitor(process, SPid), - {SPid, MRef} - end || SPid <- SPids], - %% We wait for a reply from the slaves so that we know they are in - %% a receive block and will thus receive messages we send to them - %% *without* those messages ending up in their gen_server2 pqueue. - SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3), - {{_, SPidsMRefs2, _}, BQS1} = - BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) -> - receive - {'EXIT', _Pid, Reason} -> - throw({time_to_shutdown, Reason}) - after 0 -> - ok - end, - SPMR1 = wait_for_credit(SPMR, Ref), - [begin - credit_flow:send(SPid, ?CREDIT_DISC_BOUND), - SPid ! {sync_message, Ref, Msg, MsgProps} - end || {SPid, _} <- SPMR1], - {I + 1, SPMR1, - case timer:now_diff(erlang:now(), Last) > - ?SYNC_PROGRESS_INTERVAL of - true -> rabbit_log:info( - "Synchronising ~s: ~p messages~n", - [rabbit_misc:rs(Name), I]), - erlang:now(); - false -> Last - end} - end, {0, SPidsMRefs1, erlang:now()}, BQS), - sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3), + BQS1 = rabbit_mirror_queue_sync:master(Name, Ref, SPids, BQ, BQS), rabbit_log:info("Synchronising ~s: complete~n", [rabbit_misc:rs(Name)]), State#state{backing_queue_state = BQS1}. -wait_for_credit(SPidsMRefs, Ref) -> - case credit_flow:blocked() of - true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref, - fun sync_receive_credit/3), Ref); - false -> SPidsMRefs - end. - -sync_foreach(SPidsMRefs, Ref, Fun) -> - [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, - SPid1 <- [Fun(SPid, MRef, Ref)], - SPid1 =/= dead]. - -sync_receive_ready(SPid, MRef, Ref) -> - receive - {sync_ready, Ref, SPid} -> SPid; - {'DOWN', MRef, _, SPid, _} -> dead - end. - -sync_receive_credit(SPid, MRef, Ref) -> - receive - {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), - sync_receive_credit(SPid, MRef, Ref); - {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), - dead - after 0 -> - SPid - end. - -sync_receive_complete(SPid, MRef, Ref) -> - SPid ! {sync_complete, Ref}, - receive - {sync_complete_ok, Ref, SPid} -> ok; - {'DOWN', MRef, _, SPid, _} -> ok - end, - erlang:demonitor(MRef, [flush]), - credit_flow:peer_down(SPid). - - terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index bb8def3de3..93ba882b58 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -225,10 +225,16 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, handle_cast({sync_start, Ref, MPid}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - MRef = erlang:monitor(process, MPid), - MPid ! {sync_ready, Ref, self()}, - {_MsgCount, BQS1} = BQ:purge(BQS), - sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}); + S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end, + %% [0] We can only sync when there are no pending acks + %% [1] The master died so we do not need to set_delta even though + %% we purged since we will get a depth instruction soon. + case rabbit_mirror_queue_sync:slave(Ref, MPid, BQ, BQS, + fun update_ram_duration/2) of + {ok, BQS1} -> noreply(set_delta(0, S(BQS1))); %% [0] + {failed, BQS1} -> noreply(S(BQS1)); %% [1] + {stop, R, BQS1} -> {stop, R, S(BQS1)} + end; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -824,12 +830,14 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> update_ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State#state{rate_timer_ref = just_measured, + backing_queue_state = update_ram_duration(BQ, BQS)}. + +update_ram_duration(BQ, BQS) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - State #state { rate_timer_ref = just_measured, - backing_queue_state = BQS2 }. + BQ:set_ram_duration_target(DesiredDuration, BQS1). record_synchronised(#amqqueue { name = QName }) -> Self = self(), @@ -844,44 +852,3 @@ record_synchronised(#amqqueue { name = QName }) -> ok end end). - -sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, - backing_queue_state = BQS}) -> - receive - {'DOWN', MRef, process, MPid, _Reason} -> - %% If the master dies half way we are not in the usual - %% half-synced state (with messages nearer the tail of the - %% queue; instead we have ones nearer the head. If we then - %% sync with a newly promoted master, or even just receive - %% messages from it, we have a hole in the middle. So the - %% only thing to do here is purge.) - {_MsgCount, BQS1} = BQ:purge(BQS), - credit_flow:peer_down(MPid), - noreply(State#state{backing_queue_state = BQS1}); - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg), - sync_loop(Ref, MRef, MPid, State); - {sync_complete, Ref} -> - MPid ! {sync_complete_ok, Ref, self()}, - erlang:demonitor(MRef), - credit_flow:peer_down(MPid), - %% We can only sync when there are no pending acks - noreply(set_delta(0, State)); - {'$gen_cast', {set_maximum_since_use, Age}} -> - ok = file_handle_cache:set_maximum_since_use(Age), - sync_loop(Ref, MRef, MPid, State); - {'$gen_cast', {set_ram_duration_target, Duration}} -> - BQS1 = BQ:set_ram_duration_target(Duration, BQS), - sync_loop(Ref, MRef, MPid, - State#state{backing_queue_state = BQS1}); - update_ram_duration -> - sync_loop(Ref, MRef, MPid, update_ram_duration(State)); - {sync_message, Ref, Msg, Props0} -> - credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), - Props = Props0#message_properties{needs_confirming = false, - delivered = true}, - BQS1 = BQ:publish(Msg, Props, none, BQS), - sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}); - {'EXIT', _Pid, Reason} -> - {stop, Reason, State} - end. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl new file mode 100644 index 0000000000..1a7cdbb969 --- /dev/null +++ b/src/rabbit_mirror_queue_sync.erl @@ -0,0 +1,144 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_sync). + +-include("rabbit.hrl"). + +-export([master/5, slave/5]). + +-define(SYNC_PROGRESS_INTERVAL, 1000000). + +%% --------------------------------------------------------------------------- + +master(Name, Ref, SPids, BQ, BQS) -> + SPidsMRefs = [begin + MRef = erlang:monitor(process, SPid), + {SPid, MRef} + end || SPid <- SPids], + %% We wait for a reply from the slaves so that we know they are in + %% a receive block and will thus receive messages we send to them + %% *without* those messages ending up in their gen_server2 pqueue. + SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3), + {{_, SPidsMRefs2, _}, BQS1} = + BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) -> + receive + {'EXIT', _Pid, Reason} -> + throw({time_to_shutdown, Reason}) + after 0 -> + ok + end, + SPMR1 = wait_for_credit(SPMR, Ref), + [begin + credit_flow:send(SPid, ?CREDIT_DISC_BOUND), + SPid ! {sync_message, Ref, Msg, MsgProps} + end || {SPid, _} <- SPMR1], + {I + 1, SPMR1, + case timer:now_diff(erlang:now(), Last) > + ?SYNC_PROGRESS_INTERVAL of + true -> rabbit_log:info( + "Synchronising ~s: ~p messages~n", + [rabbit_misc:rs(Name), I]), + erlang:now(); + false -> Last + end} + end, {0, SPidsMRefs1, erlang:now()}, BQS), + sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3), + BQS1. + +wait_for_credit(SPidsMRefs, Ref) -> + case credit_flow:blocked() of + true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref, + fun sync_receive_credit/3), Ref); + false -> SPidsMRefs + end. + +sync_foreach(SPidsMRefs, Ref, Fun) -> + [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, + SPid1 <- [Fun(SPid, MRef, Ref)], + SPid1 =/= dead]. + +sync_receive_ready(SPid, MRef, Ref) -> + receive + {sync_ready, Ref, SPid} -> SPid; + {'DOWN', MRef, _, SPid, _} -> dead + end. + +sync_receive_credit(SPid, MRef, Ref) -> + receive + {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), + sync_receive_credit(SPid, MRef, Ref); + {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), + dead + after 0 -> + SPid + end. + +sync_receive_complete(SPid, MRef, Ref) -> + SPid ! {sync_complete, Ref}, + receive + {sync_complete_ok, Ref, SPid} -> ok; + {'DOWN', MRef, _, SPid, _} -> ok + end, + erlang:demonitor(MRef, [flush]), + credit_flow:peer_down(SPid). + +%% --------------------------------------------------------------------------- + +slave(Ref, MPid, BQ, BQS, UpdateRamDuration) -> + MRef = erlang:monitor(process, MPid), + MPid ! {sync_ready, Ref, self()}, + {_MsgCount, BQS1} = BQ:purge(BQS), + slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration). + +slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration) -> + receive + {'DOWN', MRef, process, MPid, _Reason} -> + %% If the master dies half way we are not in the usual + %% half-synced state (with messages nearer the tail of the + %% queue; instead we have ones nearer the head. If we then + %% sync with a newly promoted master, or even just receive + %% messages from it, we have a hole in the middle. So the + %% only thing to do here is purge.) + {_MsgCount, BQS1} = BQ:purge(BQS), + credit_flow:peer_down(MPid), + {failed, BQS1}; + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration); + {sync_complete, Ref} -> + MPid ! {sync_complete_ok, Ref, self()}, + erlang:demonitor(MRef), + credit_flow:peer_down(MPid), + {ok, BQS}; + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age), + slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration); + {'$gen_cast', {set_ram_duration_target, Duration}} -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration); + update_ram_duration -> + BQS1 = UpdateRamDuration(BQ, BQS), + slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration); + {sync_message, Ref, Msg, Props0} -> + credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), + Props = Props0#message_properties{needs_confirming = false, + delivered = true}, + BQS1 = BQ:publish(Msg, Props, none, BQS), + slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration); + {'EXIT', _Pid, Reason} -> + {stop, Reason, BQS} + end. |
