summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_mirror_queue_master.erl53
-rw-r--r--src/rabbit_mirror_queue_slave.erl29
-rw-r--r--src/rabbit_mirror_queue_sync.erl29
4 files changed, 62 insertions, 61 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 70dc8aee3d..10efc798db 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1154,17 +1154,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue(AckTags, ChPid, State));
handle_call(sync_mirrors, From,
- State = #q{q = #amqqueue{name = Name},
- backing_queue = rabbit_mirror_queue_master = BQ,
+ State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
backing_queue_state = BQS}) ->
case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
- rabbit_amqqueue:lookup(Name),
- gen_server2:reply(From, ok),
+ 0 -> gen_server2:reply(From, ok),
try
- noreply(State#q{backing_queue_state =
- rabbit_mirror_queue_master:sync_mirrors(
- SPids -- SSPids, Name, BQS)})
+ BQS1 = rabbit_mirror_queue_master:sync_mirrors(BQS),
+ noreply(State#q{backing_queue_state = BQS1})
catch
{time_to_shutdown, Reason} ->
{stop, Reason, State}
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e19c1a09fe..545f2219c8 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,15 +26,16 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
+-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]).
-behaviour(rabbit_backing_queue).
-include("rabbit.hrl").
--record(state, { gm,
+-record(state, { name,
+ gm,
coordinator,
backing_queue,
backing_queue_state,
@@ -50,7 +51,8 @@
-type(death_fun() :: fun ((pid()) -> 'ok')).
-type(depth_fun() :: fun (() -> 'ok')).
--type(master_state() :: #state { gm :: pid(),
+-type(master_state() :: #state { name :: rabbit_amqqueue:name(),
+ gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
@@ -60,9 +62,9 @@
known_senders :: set()
}).
--spec(promote_backing_queue_state/7 ::
- (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
- master_state()).
+-spec(promote_backing_queue_state/8 ::
+ (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(),
+ [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
-spec(depth_fun/0 :: () -> depth_fun()).
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
@@ -108,7 +110,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -124,13 +127,19 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
-sync_mirrors([], Name, State) ->
+sync_mirrors(State = #state{name = Name}) ->
+ {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(Name),
+ sync_mirrors(SPids -- SSPids, State).
+
+sync_mirrors([], State = #state{name = Name}) ->
rabbit_log:info("Synchronising ~s: nothing to do~n",
[rabbit_misc:rs(Name)]),
State;
-sync_mirrors(SPids, Name, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+sync_mirrors(SPids, State = #state { name = Name,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n",
[rabbit_misc:rs(Name), SPids, BQ:len(BQS)]),
Ref = make_ref(),
@@ -165,24 +174,23 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, State),
State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
-stop_all_slaves(Reason, #state{gm = GM}) ->
- Info = gm:info(GM),
- Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
- node(Pid) =/= node()],
+stop_all_slaves(Reason, #state{name = Name,
+ gm = GM}) ->
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
+ Slaves = [Pid || Pid <- SPids],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
- QName = proplists:get_value(group_name, Info),
rabbit_misc:execute_mnesia_transaction(
fun () ->
- [Q] = mnesia:read({rabbit_queue, QName}),
+ [Q] = mnesia:read({rabbit_queue, Name}),
rabbit_mirror_queue_misc:store_updated_slaves(
Q #amqqueue { gm_pids = [], slave_pids = [] })
end),
- ok = gm:forget_group(QName).
+ ok = gm:forget_group(Name).
purge(State = #state { gm = GM,
backing_queue = BQ,
@@ -414,17 +422,18 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
Len = BQ:len(BQS1),
Depth = BQ:depth(BQS1),
true = Len == Depth, %% ASSERTION: everything must have been requeued
ok = gm:broadcast(GM, {depth, Depth}),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- seen_status = SeenStatus,
+ seen_status = Seen,
confirmed = [],
ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 320c07a626..1ba6774a40 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -231,8 +231,14 @@ handle_cast({sync_start, Ref, MPid},
%% [0] We can only sync when there are no pending acks
%% [1] The master died so we do not need to set_delta even though
%% we purged since we will get a depth instruction soon.
- case rabbit_mirror_queue_sync:slave(Ref, TRef, MPid, BQ, BQS,
- fun update_ram_duration_sync/2) of
+ case rabbit_mirror_queue_sync:slave(
+ Ref, TRef, MPid, BQ, BQS,
+ fun (BQN, BQSN) ->
+ BQSN1 = update_ram_duration(BQN, BQSN),
+ TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
+ {TRef, BQSN1}
+ end) of
{ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
{failed, Res} -> noreply(S(Res)); %% [1]
{stop, Reason, Res} -> {stop, Reason, S(Res)}
@@ -248,8 +254,10 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State #state { backing_queue_state = BQS1 }).
-handle_info(update_ram_duration, State) ->
- noreply(update_ram_duration(State));
+handle_info(update_ram_duration, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ noreply(State#state{rate_timer_ref = just_measured,
+ backing_queue_state = update_ram_duration(BQ, BQS)});
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(
@@ -542,7 +550,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, AckTags, SS, MPids),
+ QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -829,17 +837,6 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
-update_ram_duration(State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- State#state{rate_timer_ref = just_measured,
- backing_queue_state = update_ram_duration(BQ, BQS)}.
-
-update_ram_duration_sync(BQ, BQS) ->
- BQS1 = update_ram_duration(BQ, BQS),
- TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
- self(), update_ram_duration),
- {TRef, BQS1}.
-
update_ram_duration(BQ, BQS) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index e121612056..36e9f1eb9f 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -32,7 +32,7 @@ master(Name, Ref, SPids, BQ, BQS) ->
%% We wait for a reply from the slaves so that we know they are in
%% a receive block and will thus receive messages we send to them
%% *without* those messages ending up in their gen_server2 pqueue.
- SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3),
+ SPidsMRefs1 = foreach_slave(SPidsMRefs, Ref, fun sync_receive_ready/3),
{{_, SPidsMRefs2, _}, BQS1} =
BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) ->
receive
@@ -56,20 +56,19 @@ master(Name, Ref, SPids, BQ, BQS) ->
false -> Last
end}
end, {0, SPidsMRefs1, erlang:now()}, BQS),
- sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3),
+ foreach_slave(SPidsMRefs2, Ref, fun sync_receive_complete/3),
BQS1.
wait_for_credit(SPidsMRefs, Ref) ->
case credit_flow:blocked() of
- true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref,
- fun sync_receive_credit/3), Ref);
+ true -> wait_for_credit(foreach_slave(SPidsMRefs, Ref,
+ fun sync_receive_credit/3), Ref);
false -> SPidsMRefs
end.
-sync_foreach(SPidsMRefs, Ref, Fun) ->
+foreach_slave(SPidsMRefs, Ref, Fun) ->
[{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
- SPid1 <- [Fun(SPid, MRef, Ref)],
- SPid1 =/= dead].
+ Fun(SPid, MRef, Ref) =/= dead].
sync_receive_ready(SPid, MRef, Ref) ->
receive
@@ -102,9 +101,9 @@ slave(Ref, TRef, MPid, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, MPid),
MPid ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQS),
- slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDuration).
+ slave_sync_loop({Ref, MRef, MPid, BQ, UpdateRamDuration}, TRef, BQS1).
-slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) ->
+slave_sync_loop(Args = {Ref, MRef, MPid, BQ, UpdateRamDur}, TRef, BQS) ->
receive
{'DOWN', MRef, process, MPid, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -118,7 +117,7 @@ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) ->
{failed, {TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur);
+ slave_sync_loop(Args, TRef, BQS);
{sync_complete, Ref} ->
MPid ! {sync_complete_ok, Ref, self()},
erlang:demonitor(MRef),
@@ -126,18 +125,18 @@ slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur) ->
{ok, {TRef, BQS}};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
- slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS, UpdateRamDur);
+ slave_sync_loop(Args, TRef, BQS);
{'$gen_cast', {set_ram_duration_target, Duration}} ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur);
+ slave_sync_loop(Args, TRef, BQS1);
update_ram_duration ->
{TRef2, BQS1} = UpdateRamDur(BQ, BQS),
- slave_sync_loop(Ref, TRef2, MRef, MPid, BQ, BQS1, UpdateRamDur);
+ slave_sync_loop(Args, TRef2, BQS1);
{sync_message, Ref, Msg, Props} ->
credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
Props1 = Props#message_properties{needs_confirming = false},
- BQS1 = BQ:publish(Msg, Props1, none, BQS),
- slave_sync_loop(Ref, TRef, MRef, MPid, BQ, BQS1, UpdateRamDur);
+ BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
+ slave_sync_loop(Args, TRef, BQS1);
{'EXIT', _Pid, Reason} ->
{stop, Reason, {TRef, BQS}}
end.