diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 29 |
4 files changed, 62 insertions, 61 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 70dc8aee3d..10efc798db 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1154,17 +1154,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue(AckTags, ChPid, State)); handle_call(sync_mirrors, From, - State = #q{q = #amqqueue{name = Name}, - backing_queue = rabbit_mirror_queue_master = BQ, + State = #q{backing_queue = rabbit_mirror_queue_master = BQ, backing_queue_state = BQS}) -> case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = - rabbit_amqqueue:lookup(Name), - gen_server2:reply(From, ok), + 0 -> gen_server2:reply(From, ok), try - noreply(State#q{backing_queue_state = - rabbit_mirror_queue_master:sync_mirrors( - SPids -- SSPids, Name, BQS)}) + BQS1 = rabbit_mirror_queue_master:sync_mirrors(BQS), + noreply(State#q{backing_queue_state = BQS1}) catch {time_to_shutdown, Reason} -> {stop, Reason, State} diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e19c1a09fe..545f2219c8 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,15 +26,16 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]). +-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]). --export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]). +-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]). -behaviour(rabbit_backing_queue). -include("rabbit.hrl"). --record(state, { gm, +-record(state, { name, + gm, coordinator, backing_queue, backing_queue_state, @@ -50,7 +51,8 @@ -type(death_fun() :: fun ((pid()) -> 'ok')). -type(depth_fun() :: fun (() -> 'ok')). --type(master_state() :: #state { gm :: pid(), +-type(master_state() :: #state { name :: rabbit_amqqueue:name(), + gm :: pid(), coordinator :: pid(), backing_queue :: atom(), backing_queue_state :: any(), @@ -60,9 +62,9 @@ known_senders :: set() }). --spec(promote_backing_queue_state/7 :: - (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) -> - master_state()). +-spec(promote_backing_queue_state/8 :: + (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(), + [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). -spec(depth_fun/0 :: () -> depth_fun()). -spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> @@ -108,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> end), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), - #state { gm = GM, + #state { name = QName, + gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, @@ -124,13 +127,19 @@ stop_mirroring(State = #state { coordinator = CPid, stop_all_slaves(shutdown, State), {BQ, BQS}. -sync_mirrors([], Name, State) -> +sync_mirrors(State = #state{name = Name}) -> + {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + sync_mirrors(SPids -- SSPids, State). + +sync_mirrors([], State = #state{name = Name}) -> rabbit_log:info("Synchronising ~s: nothing to do~n", [rabbit_misc:rs(Name)]), State; -sync_mirrors(SPids, Name, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> +sync_mirrors(SPids, State = #state { name = Name, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n", [rabbit_misc:rs(Name), SPids, BQ:len(BQS)]), Ref = make_ref(), @@ -165,24 +174,23 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, State), State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}. -stop_all_slaves(Reason, #state{gm = GM}) -> - Info = gm:info(GM), - Slaves = [Pid || Pid <- proplists:get_value(group_members, Info), - node(Pid) =/= node()], +stop_all_slaves(Reason, #state{name = Name, + gm = GM}) -> + {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), + Slaves = [Pid || Pid <- SPids], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], %% Normally when we remove a slave another slave or master will %% notice and update Mnesia. But we just removed them all, and %% have stopped listening ourselves. So manually clean up. - QName = proplists:get_value(group_name, Info), rabbit_misc:execute_mnesia_transaction( fun () -> - [Q] = mnesia:read({rabbit_queue, QName}), + [Q] = mnesia:read({rabbit_queue, Name}), rabbit_mirror_queue_misc:store_updated_slaves( Q #amqqueue { gm_pids = [], slave_pids = [] }) end), - ok = gm:forget_group(QName). + ok = gm:forget_group(Name). purge(State = #state { gm = GM, backing_queue = BQ, @@ -414,17 +422,18 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Other exported functions %% --------------------------------------------------------------------------- -promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> +promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), Len = BQ:len(BQS1), Depth = BQ:depth(BQS1), true = Len == Depth, %% ASSERTION: everything must have been requeued ok = gm:broadcast(GM, {depth, Depth}), - #state { gm = GM, + #state { name = QName, + gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS1, - seen_status = SeenStatus, + seen_status = Seen, confirmed = [], ack_msg_id = dict:new(), known_senders = sets:from_list(KS) }. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 320c07a626..1ba6774a40 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -231,8 +231,14 @@ handle_cast({sync_start, Ref, MPid}, %% [0] We can only sync when there are no pending acks %% [1] The master died so we do not need to set_delta even though %% we purged since we will get a depth instruction soon. - case rabbit_mirror_queue_sync:slave(Ref, TRef, MPid, BQ, BQS, - fun update_ram_duration_sync/2) of + case rabbit_mirror_queue_sync:slave( + Ref, TRef, MPid, BQ, BQS, + fun (BQN, BQSN) -> + BQSN1 = update_ram_duration(BQN, BQSN), + TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, + self(), update_ram_duration), + {TRef, BQSN1} + end) of {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] {failed, Res} -> noreply(S(Res)); %% [1] {stop, Reason, Res} -> {stop, Reason, S(Res)} @@ -248,8 +254,10 @@ handle_cast({set_ram_duration_target, Duration}, BQS1 = BQ:set_ram_duration_target(Duration, BQS), noreply(State #state { backing_queue_state = BQS1 }). -handle_info(update_ram_duration, State) -> - noreply(update_ram_duration(State)); +handle_info(update_ram_duration, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + noreply(State#state{rate_timer_ref = just_measured, + backing_queue_state = update_ram_duration(BQ, BQS)}); handle_info(sync_timeout, State) -> noreply(backing_queue_timeout( @@ -542,7 +550,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, AckTags, SS, MPids), + QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -829,17 +837,6 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) -> true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }). -update_ram_duration(State = #state { backing_queue = BQ, - backing_queue_state = BQS }) -> - State#state{rate_timer_ref = just_measured, - backing_queue_state = update_ram_duration(BQ, BQS)}. - -update_ram_duration_sync(BQ, BQS) -> - BQS1 = update_ram_duration(BQ, BQS), - TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, - self(), update_ram_duration), - {TRef, BQS1}. - update_ram_duration(BQ, BQS) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index e121612056..36e9f1eb9f 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -32,7 +32,7 @@ master(Name, Ref, SPids, BQ, BQS) -> %% We wait for a reply from the slaves so that we know they are in %% a receive block and will thus receive messages we send to them %% *without* those messages ending up in their gen_server2 pqueue. - SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3), + SPidsMRefs1 = foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3), {{_, SPidsMRefs2, _}, BQS1} = BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) -> receive @@ -56,20 +56,19 @@ master(Name, Ref, SPids, BQ, BQS) -> false -> Last end} end, {0, SPidsMRefs1, erlang:now()}, BQS), - sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3), + foreach_slave(SPidsMRefs2, Ref, fun sync_receive_complete/3), BQS1. wait_for_credit(SPidsMRefs, Ref) -> case credit_flow:blocked() of - true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref, - fun sync_receive_credit/3), Ref); + true -> wait_for_credit(foreach_slave(SPidsMRefs, Ref, + fun sync_receive_credit/3), Ref); false -> SPidsMRefs end. -sync_foreach(SPidsMRefs, Ref, Fun) -> +foreach_slave(SPidsMRefs, Ref, Fun) -> [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, - SPid1 <- [Fun(SPid, MRef, Ref)], - SPid1 =/= dead]. + Fun(SPid, MRef, Ref) =/= dead]. sync_receive_ready(SPid, MRef, Ref) -> receive @@ -102,9 +101,9 @@ slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, MPid), MPid ! {sync_ready, Ref, self()}, {_MsgCount, BQS1} = BQ:purge(BQS), - slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDuration). + slave_sync_loop({Ref, MRef, MPid, BQ, UpdateRamDuration}, TRef, BQS1). -slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) -> +slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) -> receive {'DOWN', MRef, process, MPid, _Reason} -> %% If the master dies half way we are not in the usual @@ -118,7 +117,7 @@ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) -> {failed, {TRef, BQS1}}; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur); + slave_sync_loop(Args, TRef, BQS); {sync_complete, Ref} -> MPid ! {sync_complete_ok, Ref, self()}, erlang:demonitor(MRef), @@ -126,18 +125,18 @@ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) -> {ok, {TRef, BQS}}; {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age), - slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur); + slave_sync_loop(Args, TRef, BQS); {'$gen_cast', {set_ram_duration_target, Duration}} -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur); + slave_sync_loop(Args, TRef, BQS1); update_ram_duration -> {TRef2, BQS1} = UpdateRamDur(BQ, BQS), - slave_sync_loop(Ref, TRef2, MRef, MPid, BQ, BQS1, UpdateRamDur); + slave_sync_loop(Args, TRef2, BQS1); {sync_message, Ref, Msg, Props} -> credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), Props1 = Props#message_properties{needs_confirming = false}, - BQS1 = BQ:publish(Msg, Props1, none, BQS), - slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur); + BQS1 = BQ:publish(Msg, Props1, true, none, BQS), + slave_sync_loop(Args, TRef, BQS1); {'EXIT', _Pid, Reason} -> {stop, Reason, {TRef, BQS}} end. |
