summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl73
-rw-r--r--src/rabbit_mirror_queue_slave.erl63
-rw-r--r--src/rabbit_mirror_queue_sync.erl144
3 files changed, 160 insertions, 120 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a695d6f20d..cba3def7f3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -45,8 +45,6 @@
known_senders
}).
--define(SYNC_PROGRESS_INTERVAL, 1000000).
-
-ifdef(use_specs).
-export_type([death_fun/0, depth_fun/0]).
@@ -142,80 +140,11 @@ sync_mirrors(SPids, Name, State = #state { gm = GM,
%% We send the start over GM to flush out any other messages that
%% we might have sent that way already.
gm:broadcast(GM, {sync_start, Ref, self(), SPids}),
- SPidsMRefs = [begin
- MRef = erlang:monitor(process, SPid),
- {SPid, MRef}
- end || SPid <- SPids],
- %% We wait for a reply from the slaves so that we know they are in
- %% a receive block and will thus receive messages we send to them
- %% *without* those messages ending up in their gen_server2 pqueue.
- SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3),
- {{_, SPidsMRefs2, _}, BQS1} =
- BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) ->
- receive
- {'EXIT', _Pid, Reason} ->
- throw({time_to_shutdown, Reason})
- after 0 ->
- ok
- end,
- SPMR1 = wait_for_credit(SPMR, Ref),
- [begin
- credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
- SPid ! {sync_message, Ref, Msg, MsgProps}
- end || {SPid, _} <- SPMR1],
- {I + 1, SPMR1,
- case timer:now_diff(erlang:now(), Last) >
- ?SYNC_PROGRESS_INTERVAL of
- true -> rabbit_log:info(
- "Synchronising ~s: ~p messages~n",
- [rabbit_misc:rs(Name), I]),
- erlang:now();
- false -> Last
- end}
- end, {0, SPidsMRefs1, erlang:now()}, BQS),
- sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3),
+ BQS1 = rabbit_mirror_queue_sync:master(Name, Ref, SPids, BQ, BQS),
rabbit_log:info("Synchronising ~s: complete~n",
[rabbit_misc:rs(Name)]),
State#state{backing_queue_state = BQS1}.
-wait_for_credit(SPidsMRefs, Ref) ->
- case credit_flow:blocked() of
- true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref,
- fun sync_receive_credit/3), Ref);
- false -> SPidsMRefs
- end.
-
-sync_foreach(SPidsMRefs, Ref, Fun) ->
- [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
- SPid1 <- [Fun(SPid, MRef, Ref)],
- SPid1 =/= dead].
-
-sync_receive_ready(SPid, MRef, Ref) ->
- receive
- {sync_ready, Ref, SPid} -> SPid;
- {'DOWN', MRef, _, SPid, _} -> dead
- end.
-
-sync_receive_credit(SPid, MRef, Ref) ->
- receive
- {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
- sync_receive_credit(SPid, MRef, Ref);
- {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
- dead
- after 0 ->
- SPid
- end.
-
-sync_receive_complete(SPid, MRef, Ref) ->
- SPid ! {sync_complete, Ref},
- receive
- {sync_complete_ok, Ref, SPid} -> ok;
- {'DOWN', MRef, _, SPid, _} -> ok
- end,
- erlang:demonitor(MRef, [flush]),
- credit_flow:peer_down(SPid).
-
-
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index bb8def3de3..93ba882b58 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -225,10 +225,16 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
handle_cast({sync_start, Ref, MPid},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- MRef = erlang:monitor(process, MPid),
- MPid ! {sync_ready, Ref, self()},
- {_MsgCount, BQS1} = BQ:purge(BQS),
- sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1});
+ S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
+ %% [0] We can only sync when there are no pending acks
+ %% [1] The master died so we do not need to set_delta even though
+ %% we purged since we will get a depth instruction soon.
+ case rabbit_mirror_queue_sync:slave(Ref, MPid, BQ, BQS,
+ fun update_ram_duration/2) of
+ {ok, BQS1} -> noreply(set_delta(0, S(BQS1))); %% [0]
+ {failed, BQS1} -> noreply(S(BQS1)); %% [1]
+ {stop, R, BQS1} -> {stop, R, S(BQS1)}
+ end;
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -824,12 +830,14 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
update_ram_duration(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
+ State#state{rate_timer_ref = just_measured,
+ backing_queue_state = update_ram_duration(BQ, BQS)}.
+
+update_ram_duration(BQ, BQS) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- State #state { rate_timer_ref = just_measured,
- backing_queue_state = BQS2 }.
+ BQ:set_ram_duration_target(DesiredDuration, BQS1).
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
@@ -844,44 +852,3 @@ record_synchronised(#amqqueue { name = QName }) ->
ok
end
end).
-
-sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- receive
- {'DOWN', MRef, process, MPid, _Reason} ->
- %% If the master dies half way we are not in the usual
- %% half-synced state (with messages nearer the tail of the
- %% queue; instead we have ones nearer the head. If we then
- %% sync with a newly promoted master, or even just receive
- %% messages from it, we have a hole in the middle. So the
- %% only thing to do here is purge.)
- {_MsgCount, BQS1} = BQ:purge(BQS),
- credit_flow:peer_down(MPid),
- noreply(State#state{backing_queue_state = BQS1});
- {bump_credit, Msg} ->
- credit_flow:handle_bump_msg(Msg),
- sync_loop(Ref, MRef, MPid, State);
- {sync_complete, Ref} ->
- MPid ! {sync_complete_ok, Ref, self()},
- erlang:demonitor(MRef),
- credit_flow:peer_down(MPid),
- %% We can only sync when there are no pending acks
- noreply(set_delta(0, State));
- {'$gen_cast', {set_maximum_since_use, Age}} ->
- ok = file_handle_cache:set_maximum_since_use(Age),
- sync_loop(Ref, MRef, MPid, State);
- {'$gen_cast', {set_ram_duration_target, Duration}} ->
- BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- sync_loop(Ref, MRef, MPid,
- State#state{backing_queue_state = BQS1});
- update_ram_duration ->
- sync_loop(Ref, MRef, MPid, update_ram_duration(State));
- {sync_message, Ref, Msg, Props0} ->
- credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
- Props = Props0#message_properties{needs_confirming = false,
- delivered = true},
- BQS1 = BQ:publish(Msg, Props, none, BQS),
- sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1});
- {'EXIT', _Pid, Reason} ->
- {stop, Reason, State}
- end.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
new file mode 100644
index 0000000000..1a7cdbb969
--- /dev/null
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -0,0 +1,144 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_sync).
+
+-include("rabbit.hrl").
+
+-export([master/5, slave/5]).
+
+-define(SYNC_PROGRESS_INTERVAL, 1000000).
+
+%% ---------------------------------------------------------------------------
+
+master(Name, Ref, SPids, BQ, BQS) ->
+ SPidsMRefs = [begin
+ MRef = erlang:monitor(process, SPid),
+ {SPid, MRef}
+ end || SPid <- SPids],
+ %% We wait for a reply from the slaves so that we know they are in
+ %% a receive block and will thus receive messages we send to them
+ %% *without* those messages ending up in their gen_server2 pqueue.
+ SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3),
+ {{_, SPidsMRefs2, _}, BQS1} =
+ BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) ->
+ receive
+ {'EXIT', _Pid, Reason} ->
+ throw({time_to_shutdown, Reason})
+ after 0 ->
+ ok
+ end,
+ SPMR1 = wait_for_credit(SPMR, Ref),
+ [begin
+ credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
+ SPid ! {sync_message, Ref, Msg, MsgProps}
+ end || {SPid, _} <- SPMR1],
+ {I + 1, SPMR1,
+ case timer:now_diff(erlang:now(), Last) >
+ ?SYNC_PROGRESS_INTERVAL of
+ true -> rabbit_log:info(
+ "Synchronising ~s: ~p messages~n",
+ [rabbit_misc:rs(Name), I]),
+ erlang:now();
+ false -> Last
+ end}
+ end, {0, SPidsMRefs1, erlang:now()}, BQS),
+ sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3),
+ BQS1.
+
+wait_for_credit(SPidsMRefs, Ref) ->
+ case credit_flow:blocked() of
+ true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref,
+ fun sync_receive_credit/3), Ref);
+ false -> SPidsMRefs
+ end.
+
+sync_foreach(SPidsMRefs, Ref, Fun) ->
+ [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
+ SPid1 <- [Fun(SPid, MRef, Ref)],
+ SPid1 =/= dead].
+
+sync_receive_ready(SPid, MRef, Ref) ->
+ receive
+ {sync_ready, Ref, SPid} -> SPid;
+ {'DOWN', MRef, _, SPid, _} -> dead
+ end.
+
+sync_receive_credit(SPid, MRef, Ref) ->
+ receive
+ {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
+ sync_receive_credit(SPid, MRef, Ref);
+ {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
+ dead
+ after 0 ->
+ SPid
+ end.
+
+sync_receive_complete(SPid, MRef, Ref) ->
+ SPid ! {sync_complete, Ref},
+ receive
+ {sync_complete_ok, Ref, SPid} -> ok;
+ {'DOWN', MRef, _, SPid, _} -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(SPid).
+
+%% ---------------------------------------------------------------------------
+
+slave(Ref, MPid, BQ, BQS, UpdateRamDuration) ->
+ MRef = erlang:monitor(process, MPid),
+ MPid ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration).
+
+slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration) ->
+ receive
+ {'DOWN', MRef, process, MPid, _Reason} ->
+ %% If the master dies half way we are not in the usual
+ %% half-synced state (with messages nearer the tail of the
+ %% queue; instead we have ones nearer the head. If we then
+ %% sync with a newly promoted master, or even just receive
+ %% messages from it, we have a hole in the middle. So the
+ %% only thing to do here is purge.)
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ credit_flow:peer_down(MPid),
+ {failed, BQS1};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration);
+ {sync_complete, Ref} ->
+ MPid ! {sync_complete_ok, Ref, self()},
+ erlang:demonitor(MRef),
+ credit_flow:peer_down(MPid),
+ {ok, BQS};
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ slave_sync_loop(Ref, MRef, MPid, BQ, BQS, UpdateRamDuration);
+ {'$gen_cast', {set_ram_duration_target, Duration}} ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration);
+ update_ram_duration ->
+ BQS1 = UpdateRamDuration(BQ, BQS),
+ slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration);
+ {sync_message, Ref, Msg, Props0} ->
+ credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
+ Props = Props0#message_properties{needs_confirming = false,
+ delivered = true},
+ BQS1 = BQ:publish(Msg, Props, none, BQS),
+ slave_sync_loop(Ref, MRef, MPid, BQ, BQS1, UpdateRamDuration);
+ {'EXIT', _Pid, Reason} ->
+ {stop, Reason, BQS}
+ end.