diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-27 18:30:03 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-27 18:30:03 +0000 |
| commit | fb167b72174df758dc35d04787f220a6947c1752 (patch) | |
| tree | 95eb8bfebeb9b1881fb8e5cbd9a0286f2dca662f | |
| parent | 615ff0d8f91b95112616c9af05b48bec93e5377b (diff) | |
| parent | 0a808bbfd471334b3a4467e44ef4f07ee1869fc2 (diff) | |
| download | rabbitmq-server-git-fb167b72174df758dc35d04787f220a6947c1752.tar.gz | |
merge default into bug24407
...which involved a minor tweak since #message_properties.delivered is
gone
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 143 |
5 files changed, 229 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7827b839d5..4bdab0bc01 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). --export([start_mirroring/1, stop_mirroring/1]). +-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -173,6 +173,8 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). +-spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) -> + 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). -endif. @@ -590,6 +592,8 @@ set_maximum_since_use(QPid, Age) -> start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring). stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring). +sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 74717acee5..70dc8aee3d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1153,6 +1153,28 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(requeue(AckTags, ChPid, State)); +handle_call(sync_mirrors, From, + State = #q{q = #amqqueue{name = Name}, + backing_queue = rabbit_mirror_queue_master = BQ, + backing_queue_state = BQS}) -> + case BQ:depth(BQS) - BQ:len(BQS) of + 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + gen_server2:reply(From, ok), + try + noreply(State#q{backing_queue_state = + rabbit_mirror_queue_master:sync_mirrors( + SPids -- SSPids, Name, BQS)}) + catch + {time_to_shutdown, Reason} -> + {stop, Reason, State} + end; + _ -> reply({error, pending_acks}, State) + end; + +handle_call(sync_mirrors, _From, State) -> + reply({error, not_mirrored}, State); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c8a361b1e0..e19c1a09fe 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -28,7 +28,7 @@ -export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). -behaviour(rabbit_backing_queue). @@ -124,6 +124,24 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. +sync_mirrors([], Name, State) -> + rabbit_log:info("Synchronising ~s: nothing to do~n", + [rabbit_misc:rs(Name)]), + State; +sync_mirrors(SPids, Name, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n", + [rabbit_misc:rs(Name), SPids, BQ:len(BQS)]), + Ref = make_ref(), + %% We send the start over GM to flush out any other messages that + %% we might have sent that way already. + gm:broadcast(GM, {sync_start, Ref, self(), SPids}), + BQS1 = rabbit_mirror_queue_sync:master(Name, Ref, SPids, BQ, BQS), + rabbit_log:info("Synchronising ~s: complete~n", + [rabbit_misc:rs(Name)]), + State#state{backing_queue_state = BQS1}. + terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9354f48514..320c07a626 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -222,6 +222,22 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, end, noreply(maybe_enqueue_message(Delivery, State)); +handle_cast({sync_start, Ref, MPid}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), + S = fun({TRefN, BQSN}) -> State1#state{rate_timer_ref = TRefN, + backing_queue_state = BQSN} end, + %% [0] We can only sync when there are no pending acks + %% [1] The master died so we do not need to set_delta even though + %% we purged since we will get a depth instruction soon. + case rabbit_mirror_queue_sync:slave(Ref, TRef, MPid, BQ, BQS, + fun update_ram_duration_sync/2) of + {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {failed, Res} -> noreply(S(Res)); %% [1] + {stop, Reason, Res} -> {stop, Reason, S(Res)} + end; + handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); @@ -232,15 +248,8 @@ handle_cast({set_ram_duration_target, Duration}, BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State #state { backing_queue_state = BQS1 }). -handle_info(update_ram_duration, - State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State #state { rate_timer_ref = just_measured, - backing_queue_state = BQS2 }); +handle_info(update_ram_duration, State) -> + noreply(update_ram_duration(State)); handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( @@ -357,6 +366,11 @@ handle_msg([_SPid], _From, process_death) -> handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; +handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) -> + case lists:member(SPid, SPids) of + true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid}); + false -> ok + end; handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). @@ -815,6 +829,23 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). +update_ram_duration(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State#state{rate_timer_ref = just_measured, + backing_queue_state = update_ram_duration(BQ, BQS)}. + +update_ram_duration_sync(BQ, BQS) -> + BQS1 = update_ram_duration(BQ, BQS), + TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), + {TRef, BQS1}. + +update_ram_duration(BQ, BQS) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQ:set_ram_duration_target(DesiredDuration, BQS1). + record_synchronised(#amqqueue { name = QName }) -> Self = self(), rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl new file mode 100644 index 0000000000..e121612056 --- /dev/null +++ b/src/rabbit_mirror_queue_sync.erl @@ -0,0 +1,143 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_sync). + +-include("rabbit.hrl"). + +-export([master/5, slave/6]). + +-define(SYNC_PROGRESS_INTERVAL, 1000000). + +%% --------------------------------------------------------------------------- + +master(Name, Ref, SPids, BQ, BQS) -> + SPidsMRefs = [begin + MRef = erlang:monitor(process, SPid), + {SPid, MRef} + end || SPid <- SPids], + %% We wait for a reply from the slaves so that we know they are in + %% a receive block and will thus receive messages we send to them + %% *without* those messages ending up in their gen_server2 pqueue. + SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3), + {{_, SPidsMRefs2, _}, BQS1} = + BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) -> + receive + {'EXIT', _Pid, Reason} -> + throw({time_to_shutdown, Reason}) + after 0 -> + ok + end, + SPMR1 = wait_for_credit(SPMR, Ref), + [begin + credit_flow:send(SPid, ?CREDIT_DISC_BOUND), + SPid ! {sync_message, Ref, Msg, MsgProps} + end || {SPid, _} <- SPMR1], + {I + 1, SPMR1, + case timer:now_diff(erlang:now(), Last) > + ?SYNC_PROGRESS_INTERVAL of + true -> rabbit_log:info( + "Synchronising ~s: ~p messages~n", + [rabbit_misc:rs(Name), I]), + erlang:now(); + false -> Last + end} + end, {0, SPidsMRefs1, erlang:now()}, BQS), + sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3), + BQS1. + +wait_for_credit(SPidsMRefs, Ref) -> + case credit_flow:blocked() of + true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref, + fun sync_receive_credit/3), Ref); + false -> SPidsMRefs + end. + +sync_foreach(SPidsMRefs, Ref, Fun) -> + [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, + SPid1 <- [Fun(SPid, MRef, Ref)], + SPid1 =/= dead]. + +sync_receive_ready(SPid, MRef, Ref) -> + receive + {sync_ready, Ref, SPid} -> SPid; + {'DOWN', MRef, _, SPid, _} -> dead + end. + +sync_receive_credit(SPid, MRef, Ref) -> + receive + {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), + sync_receive_credit(SPid, MRef, Ref); + {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), + dead + after 0 -> + SPid + end. + +sync_receive_complete(SPid, MRef, Ref) -> + SPid ! {sync_complete, Ref}, + receive + {sync_complete_ok, Ref, SPid} -> ok; + {'DOWN', MRef, _, SPid, _} -> ok + end, + erlang:demonitor(MRef, [flush]), + credit_flow:peer_down(SPid). + +%% --------------------------------------------------------------------------- + +slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) -> + MRef = erlang:monitor(process, MPid), + MPid ! {sync_ready, Ref, self()}, + {_MsgCount, BQS1} = BQ:purge(BQS), + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDuration). + +slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) -> + receive + {'DOWN', MRef, process, MPid, _Reason} -> + %% If the master dies half way we are not in the usual + %% half-synced state (with messages nearer the tail of the + %% queue); instead we have ones nearer the head. If we then + %% sync with a newly promoted master, or even just receive + %% messages from it, we have a hole in the middle. So the + %% only thing to do here is purge. + {_MsgCount, BQS1} = BQ:purge(BQS), + credit_flow:peer_down(MPid), + {failed, {TRef, BQS1}}; + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur); + {sync_complete, Ref} -> + MPid ! {sync_complete_ok, Ref, self()}, + erlang:demonitor(MRef), + credit_flow:peer_down(MPid), + {ok, {TRef, BQS}}; + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age), + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur); + {'$gen_cast', {set_ram_duration_target, Duration}} -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur); + update_ram_duration -> + {TRef2, BQS1} = UpdateRamDur(BQ, BQS), + slave_sync_loop(Ref, TRef2, MRef, MPid, BQ, BQS1, UpdateRamDur); + {sync_message, Ref, Msg, Props} -> + credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), + Props1 = Props#message_properties{needs_confirming = false}, + BQS1 = BQ:publish(Msg, Props1, none, BQS), + slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur); + {'EXIT', _Pid, Reason} -> + {stop, Reason, {TRef, BQS}} + end. |
