summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-26 14:30:55 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-26 14:30:55 +0000
commit4f849e262b7ff7327e913b27d00ce83a6b4e0e43 (patch)
tree6f591b27b81832bd68fa603ad23010a57dba6cc8
parent7686980bd151e5d3c46220bfcfcdc1b01d25b821 (diff)
parent596aae238c097155b24fbc81810be33d96c63388 (diff)
downloadrabbitmq-server-git-4f849e262b7ff7327e913b27d00ce83a6b4e0e43.tar.gz
Merge in default
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_mirror_queue_master.erl85
-rw-r--r--src/rabbit_mirror_queue_slave.erl44
4 files changed, 151 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7827b839d5..c1884118a9 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -173,6 +173,9 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
+-spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) ->
+ 'ok' | rabbit_types:error('queue_has_pending_acks')
+ | rabbit_types:error('queue_not_mirrored')).
-endif.
@@ -590,6 +593,8 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
+sync_mirrors(#amqqueue{pid = QPid}) -> delegate_call(QPid, sync_mirrors).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe14baa728..60857e7e3d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1154,6 +1154,23 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
+handle_call(sync_mirrors, From,
+ State = #q{q = #amqqueue{name = Name},
+ backing_queue = rabbit_mirror_queue_master = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:depth(BQS) - BQ:len(BQS) of
+ 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(Name),
+ gen_server2:reply(From, ok),
+ noreply(State#q{backing_queue_state =
+ rabbit_mirror_queue_master:sync_mirrors(
+ SPids -- SSPids, Name, BQS)});
+ _ -> reply({error, queue_has_pending_acks}, State)
+ end;
+
+handle_call(sync_mirrors, _From, State) ->
+ reply({error, queue_not_mirrored}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8fcd1893a6..64b78fbbdb 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -28,7 +28,7 @@
-export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/3]).
-behaviour(rabbit_backing_queue).
@@ -45,6 +45,8 @@
known_senders
}).
+-define(SYNC_PROGRESS_INTERVAL, 1000000).
+
-ifdef(use_specs).
-export_type([death_fun/0, depth_fun/0]).
@@ -127,6 +129,87 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
+sync_mirrors([], Name, State) ->
+ rabbit_log:info("Synchronising ~s: nothing to do~n",
+ [rabbit_misc:rs(Name)]),
+ State;
+sync_mirrors(SPids, Name, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ rabbit_log:info("Synchronising ~s with slaves ~p: ~p messages to do~n",
+ [rabbit_misc:rs(Name), SPids, BQ:len(BQS)]),
+ Ref = make_ref(),
+ %% We send the start over GM to flush out any other messages that
+ %% we might have sent that way already.
+ gm:broadcast(GM, {sync_start, Ref, self(), SPids}),
+ SPidsMRefs = [begin
+ MRef = erlang:monitor(process, SPid),
+ {SPid, MRef}
+ end || SPid <- SPids],
+ %% We wait for a reply from the slaves so that we know they are in
+ %% a receive block and will thus receive messages we send to them
+ %% *without* those messages ending up in their gen_server2 pqueue.
+ SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3),
+ {{_, SPidsMRefs2, _}, BQS1} =
+ BQ:fold(fun (Msg, MsgProps, {I, SPMR, Last}) ->
+ SPMR1 = wait_for_credit(SPMR, Ref),
+ [begin
+ credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
+ SPid ! {sync_message, Ref, Msg, MsgProps}
+ end || {SPid, _} <- SPMR1],
+ {I + 1, SPMR1,
+ case timer:now_diff(erlang:now(), Last) >
+ ?SYNC_PROGRESS_INTERVAL of
+ true -> rabbit_log:info(
+ "Synchronising ~s: ~p messages~n",
+ [rabbit_misc:rs(Name), I]),
+ erlang:now();
+ false -> Last
+ end}
+ end, {0, SPidsMRefs1, erlang:now()}, BQS),
+ sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3),
+ rabbit_log:info("Synchronising ~s: complete~n",
+ [rabbit_misc:rs(Name)]),
+ State#state{backing_queue_state = BQS1}.
+
+wait_for_credit(SPidsMRefs, Ref) ->
+ case credit_flow:blocked() of
+ true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref,
+ fun sync_receive_credit/3), Ref);
+ false -> SPidsMRefs
+ end.
+
+sync_foreach(SPidsMRefs, Ref, Fun) ->
+ [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
+ SPid1 <- [Fun(SPid, MRef, Ref)],
+ SPid1 =/= dead].
+
+sync_receive_ready(SPid, MRef, Ref) ->
+ receive
+ {sync_ready, Ref, SPid} -> SPid;
+ {'DOWN', MRef, _, SPid, _} -> dead
+ end.
+
+sync_receive_credit(SPid, MRef, Ref) ->
+ receive
+ {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
+ sync_receive_credit(SPid, MRef, Ref);
+ {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
+ dead
+ after 0 ->
+ SPid
+ end.
+
+sync_receive_complete(SPid, MRef, Ref) ->
+ SPid ! {sync_complete, Ref},
+ receive
+ {sync_complete_ok, Ref, SPid} -> ok;
+ {'DOWN', MRef, _, SPid, _} -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(SPid).
+
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index cb7a2135c3..5ea146988a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -222,6 +222,15 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
+handle_cast({sync_start, Ref, MPid},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ MRef = erlang:monitor(process, MPid),
+ MPid ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ noreply(
+ sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}));
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
@@ -358,6 +367,11 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
+handle_msg([SPid], _From, {sync_start, Ref, MPid, SPids}) ->
+ case lists:member(SPid, SPids) of
+ true -> ok = gen_server2:cast(SPid, {sync_start, Ref, MPid});
+ false -> ok
+ end;
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
@@ -829,3 +843,33 @@ record_synchronised(#amqqueue { name = QName }) ->
ok
end
end).
+
+sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ receive
+ {'DOWN', MRef, process, MPid, _Reason} ->
+ %% If the master dies half way we are not in the usual
+ %% half-synced state (with messages nearer the tail of the
+ %% queue; instead we have ones nearer the head. If we then
+ %% sync with a newly promoted master, or even just receive
+ %% messages from it, we have a hole in the middle. So the
+ %% only thing to do here is purge.)
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ credit_flow:peer_down(MPid),
+ State#state{backing_queue_state = BQS1};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ sync_loop(Ref, MRef, MPid, State);
+ {sync_complete, Ref} ->
+ MPid ! {sync_complete_ok, Ref, self()},
+ erlang:demonitor(MRef),
+ credit_flow:peer_down(MPid),
+ %% We can only sync when there are no pending acks
+ set_delta(0, State);
+ {sync_message, Ref, Msg, Props0} ->
+ credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
+ Props = Props0#message_properties{needs_confirming = false,
+ delivered = true},
+ BQS1 = BQ:publish(Msg, Props, none, BQS),
+ sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})
+ end.