summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-27 18:30:03 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-27 18:30:03 +0000
commitfb167b72174df758dc35d04787f220a6947c1752 (patch)
tree95eb8bfebeb9b1881fb8e5cbd9a0286f2dca662f
parent615ff0d8f91b95112616c9af05b48bec93e5377b (diff)
parent0a808bbfd471334b3a4467e44ef4f07ee1869fc2 (diff)
downloadrabbitmq-server-git-fb167b72174df758dc35d04787f220a6947c1752.tar.gz
merge default into bug24407
...which involved a minor tweak since #message_properties.delivered is gone
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl22
-rw-r--r--src/rabbit_mirror_queue_master.erl20
-rw-r--r--src/rabbit_mirror_queue_slave.erl49
-rw-r--r--src/rabbit_mirror_queue_sync.erl143
5 files changed, 229 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7827b839d5..4bdab0bc01 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -173,6 +173,8 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
+-spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) ->
+ 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
-endif.
@@ -590,6 +592,8 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
+sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 74717acee5..70dc8aee3d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1153,6 +1153,28 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
+handle_call(sync_mirrors, From,
+ State = #q{q = #amqqueue{name = Name},
+ backing_queue = rabbit_mirror_queue_master = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:depth(BQS) - BQ:len(BQS) of
+ 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(Name),
+ gen_server2:reply(From, ok),
+ try
+ noreply(State#q{backing_queue_state =
+ rabbit_mirror_queue_master:sync_mirrors(
+ SPids -- SSPids, Name, BQS)})
+ catch
+ {time_to_shutdown, Reason} ->
+ {stop, Reason, State}
+ end;
+ _ -> reply({error, pending_acks}, State)
+ end;
+
+handle_call(sync_mirrors, _From, State) ->
+ reply({error, not_mirrored}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c8a361b1e0..e19c1a09fe 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -28,7 +28,7 @@
-export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
@@ -124,6 +124,24 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
+sync_mirrors([], Name, State) ->
+ rabbit_log:info("Synchronising ~s: nothing to do~n",
+ [rabbit_misc:rs(Name)]),
+ State;
+sync_mirrors(SPids, Name, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n",
+ [rabbit_misc:rs(Name), SPids, BQ:len(BQS)]),
+ Ref = make_ref(),
+ %% We send the start over GM to flush out any other messages that
+ %% we might have sent that way already.
+ gm:broadcast(GM, {sync_start, Ref, self(), SPids}),
+ BQS1 = rabbit_mirror_queue_sync:master(Name, Ref, SPids, BQ, BQS),
+ rabbit_log:info("Synchronising ~s: complete~n",
+ [rabbit_misc:rs(Name)]),
+ State#state{backing_queue_state = BQS1}.
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9354f48514..320c07a626 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -222,6 +222,22 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
+handle_cast({sync_start, Ref, MPid},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
+ S = fun({TRefN, BQSN}) -> State1#state{rate_timer_ref = TRefN,
+ backing_queue_state = BQSN} end,
+ %% [0] We can only sync when there are no pending acks
+ %% [1] The master died so we do not need to set_delta even though
+ %% we purged since we will get a depth instruction soon.
+ case rabbit_mirror_queue_sync:slave(Ref, TRef, MPid, BQ, BQS,
+ fun update_ram_duration_sync/2) of
+ {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {failed, Res} -> noreply(S(Res)); %% [1]
+ {stop, Reason, Res} -> {stop, Reason, S(Res)}
+ end;
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
@@ -232,15 +248,8 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State #state { backing_queue_state = BQS1 }).
-handle_info(update_ram_duration,
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State #state { rate_timer_ref = just_measured,
- backing_queue_state = BQS2 });
+handle_info(update_ram_duration, State) ->
+ noreply(update_ram_duration(State));
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(
@@ -357,6 +366,11 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
+handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) ->
+ case lists:member(SPid, SPids) of
+ true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid});
+ false -> ok
+ end;
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
@@ -815,6 +829,23 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+update_ram_duration(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State#state{rate_timer_ref = just_measured,
+ backing_queue_state = update_ram_duration(BQ, BQS)}.
+
+update_ram_duration_sync(BQ, BQS) ->
+ BQS1 = update_ram_duration(BQ, BQS),
+ TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
+ {TRef, BQS1}.
+
+update_ram_duration(BQ, BQS) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQ:set_ram_duration_target(DesiredDuration, BQS1).
+
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
new file mode 100644
index 0000000000..e121612056
--- /dev/null
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -0,0 +1,143 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_sync).
+
+-include("rabbit.hrl").
+
+-export([master/5, slave/6]).
+
+-define(SYNC_PROGRESS_INTERVAL, 1000000).
+
+%% ---------------------------------------------------------------------------
+
+master(Name, Ref, SPids, BQ, BQS) ->
+ SPidsMRefs = [begin
+ MRef = erlang:monitor(process, SPid),
+ {SPid, MRef}
+ end || SPid <- SPids],
+ %% We wait for a reply from the slaves so that we know they are in
+ %% a receive block and will thus receive messages we send to them
+ %% *without* those messages ending up in their gen_server2 pqueue.
+ SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3),
+ {{_, SPidsMRefs2, _}, BQS1} =
+ BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) ->
+ receive
+ {'EXIT', _Pid, Reason} ->
+ throw({time_to_shutdown, Reason})
+ after 0 ->
+ ok
+ end,
+ SPMR1 = wait_for_credit(SPMR, Ref),
+ [begin
+ credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
+ SPid ! {sync_message, Ref, Msg, MsgProps}
+ end || {SPid, _} <- SPMR1],
+ {I + 1, SPMR1,
+ case timer:now_diff(erlang:now(), Last) >
+ ?SYNC_PROGRESS_INTERVAL of
+ true -> rabbit_log:info(
+ "Synchronising ~s: ~p messages~n",
+ [rabbit_misc:rs(Name), I]),
+ erlang:now();
+ false -> Last
+ end}
+ end, {0, SPidsMRefs1, erlang:now()}, BQS),
+ sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3),
+ BQS1.
+
+wait_for_credit(SPidsMRefs, Ref) ->
+ case credit_flow:blocked() of
+ true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref,
+ fun sync_receive_credit/3), Ref);
+ false -> SPidsMRefs
+ end.
+
+sync_foreach(SPidsMRefs, Ref, Fun) ->
+ [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
+ SPid1 <- [Fun(SPid, MRef, Ref)],
+ SPid1 =/= dead].
+
+sync_receive_ready(SPid, MRef, Ref) ->
+ receive
+ {sync_ready, Ref, SPid} -> SPid;
+ {'DOWN', MRef, _, SPid, _} -> dead
+ end.
+
+sync_receive_credit(SPid, MRef, Ref) ->
+ receive
+ {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
+ sync_receive_credit(SPid, MRef, Ref);
+ {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
+ dead
+ after 0 ->
+ SPid
+ end.
+
+sync_receive_complete(SPid, MRef, Ref) ->
+ SPid ! {sync_complete, Ref},
+ receive
+ {sync_complete_ok, Ref, SPid} -> ok;
+ {'DOWN', MRef, _, SPid, _} -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(SPid).
+
+%% ---------------------------------------------------------------------------
+
+slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) ->
+ MRef = erlang:monitor(process, MPid),
+ MPid ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDuration).
+
+slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) ->
+ receive
+ {'DOWN', MRef, process, MPid, _Reason} ->
+ %% If the master dies half way we are not in the usual
+ %% half-synced state (with messages nearer the tail of the
+ %% queue); instead we have ones nearer the head. If we then
+ %% sync with a newly promoted master, or even just receive
+ %% messages from it, we have a hole in the middle. So the
+ %% only thing to do here is purge.
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ credit_flow:peer_down(MPid),
+ {failed, {TRef, BQS1}};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur);
+ {sync_complete, Ref} ->
+ MPid ! {sync_complete_ok, Ref, self()},
+ erlang:demonitor(MRef),
+ credit_flow:peer_down(MPid),
+ {ok, {TRef, BQS}};
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur);
+ {'$gen_cast', {set_ram_duration_target, Duration}} ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur);
+ update_ram_duration ->
+ {TRef2, BQS1} = UpdateRamDur(BQ, BQS),
+ slave_sync_loop(Ref, TRef2, MRef, MPid, BQ, BQS1, UpdateRamDur);
+ {sync_message, Ref, Msg, Props} ->
+ credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
+ Props1 = Props#message_properties{needs_confirming = false},
+ BQS1 = BQ:publish(Msg, Props1, none, BQS),
+ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur);
+ {'EXIT', _Pid, Reason} ->
+ {stop, Reason, {TRef, BQS}}
+ end.