summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-26 14:03:09 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-26 14:03:09 +0000
commite3d8fac887180e047fffcdb2d1524b2a65eb8c1f (patch)
tree709101c51293195514d9da7c6483ff04fde9ccdc
parentfc09df6b4b12e10020ebb2a78235ffd15d7c0cbe (diff)
downloadrabbitmq-server-git-e3d8fac887180e047fffcdb2d1524b2a65eb8c1f.tar.gz
Fix bugs in previous commit, and be rather more thorough about monitoring and cleaning up credit flow.
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl65
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
4 files changed, 51 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ad81ba032c..e5f1cb908b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -174,7 +174,8 @@
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
-spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) ->
- 'ok' | error('queue_has_pending_acks') | error('queue_not_mirrored')).
+ 'ok' | rabbit_types:error('queue_has_pending_acks')
+ | rabbit_types:error('queue_not_mirrored')).
-endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7b167a9a66..8c2fafa653 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1179,8 +1179,9 @@ handle_call(sync_mirrors, From,
0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} =
rabbit_amqqueue:lookup(Name),
gen_server2:reply(From, ok),
- noreply(rabbit_mirror_queue_master:sync_mirrors(
- SPids -- SSPids, Name, BQS));
+ noreply(State#q{backing_queue_state =
+ rabbit_mirror_queue_master:sync_mirrors(
+ SPids -- SSPids, Name, BQS)});
_ -> reply({error, queue_has_pending_acks}, State)
end;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ea9e04fedf..542d724af1 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -144,44 +144,65 @@ sync_mirrors(SPids, Name, State = #state { backing_queue = BQ,
%% We wait for a reply from the slaves so that we know they are in
%% a receive block and will thus receive messages we send to them
%% *without* those messages ending up in their gen_server2 pqueue.
- SPids1 = [SPid1 || {SPid, MRef} <- SPidsMRefs,
- SPid1 <- [receive
- {'DOWN', MRef, process, SPid, _Reason} ->
- dead;
- {sync_ready, Ref, SPid} ->
- SPid
- end],
- SPid1 =/= dead],
- [erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs],
- {Total, BQS1} =
- BQ:fold(fun ({Msg, MsgProps}, I) ->
- wait_for_credit(),
+ SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3),
+ {{Total, SPidsMRefs2}, BQS1} =
+ BQ:fold(fun ({Msg, MsgProps}, {I, SPMR}) ->
+ SPMR1 = wait_for_credit(SPMR, Ref),
[begin
credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
SPid ! {sync_message, Ref, Msg, MsgProps}
- end || SPid <- SPids1],
+ end || {SPid, _} <- SPMR1],
case I rem 1000 of
0 -> rabbit_log:info(
"Synchronising ~s: ~p messages~n",
[rabbit_misc:rs(Name), I]);
_ -> ok
end,
- I + 1
- end, 0, BQS),
- [SPid ! {sync_complete, Ref} || SPid <- SPids1],
+ {I + 1, SPMR1}
+ end, {0, SPidsMRefs1}, BQS),
+ sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3),
rabbit_log:info("Synchronising ~s: ~p messages; complete~n",
[rabbit_misc:rs(Name), Total]),
State#state{backing_queue_state = BQS1}.
-wait_for_credit() ->
+wait_for_credit(SPidsMRefs, Ref) ->
case credit_flow:blocked() of
- true -> receive
- {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg),
- wait_for_credit()
- end;
- false -> ok
+ true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref,
+ fun sync_receive_credit/3), Ref);
+ false -> SPidsMRefs
end.
+sync_foreach(SPidsMRefs, Ref, Fun) ->
+ [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs,
+ SPid1 <- [Fun(SPid, MRef, Ref)],
+ SPid1 =/= dead].
+
+sync_receive_ready(SPid, MRef, Ref) ->
+ receive
+ {sync_ready, Ref, SPid} -> SPid;
+ {'DOWN', MRef, _, SPid, _} -> dead
+ end.
+
+sync_receive_credit(SPid, MRef, Ref) ->
+ receive
+ {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg),
+ sync_receive_credit(SPid, MRef, Ref);
+ {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid),
+ dead
+ after 0 ->
+ SPid
+ end.
+
+sync_receive_complete(SPid, MRef, Ref) ->
+ SPid ! {sync_complete, Ref},
+ receive
+ {sync_complete_ok, Ref, SPid} -> ok;
+ {'DOWN', MRef, _, SPid, _} -> ok
+ end,
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(SPid).
+
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f4130e0250..311c6ca658 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -851,12 +851,15 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.)
{_MsgCount, BQS1} = BQ:purge(BQS),
+ credit_flow:peer_down(MPid),
State#state{backing_queue_state = BQS1};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
sync_loop(Ref, MRef, MPid, State);
{sync_complete, Ref} ->
+ MPid ! {sync_complete_ok, Ref, self()},
erlang:demonitor(MRef),
+ credit_flow:peer_down(MPid),
%% We can only sync when there are no pending acks
set_delta(0, State);
{sync_message, Ref, Msg, Props0} ->