summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-11-23 17:19:03 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-11-23 17:19:03 +0000
commitcfe5aa3fbf644ec69bab8a50e2c6e6bd531b1e5a (patch)
tree077bf36919616031fc6acda190b83dafef582747 /src
parent613bd3c3b1178da56a6b414f84ddd4724a8fd775 (diff)
downloadrabbitmq-server-git-cfe5aa3fbf644ec69bab8a50e2c6e6bd531b1e5a.tar.gz
Flow control for the sync process.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl15
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
2 files changed, 23 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 05075d0f2b..dadaef1de7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -155,7 +155,11 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ,
[erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs],
{Total, _BQS} =
BQ:fold(fun (M = #basic_message{}, I) ->
- [SPid ! {sync_message, Ref, M} || SPid <- SPids1],
+ wait_for_credit(),
+ [begin
+ credit_flow:send(SPid, ?CREDIT_DISC_BOUND),
+ SPid ! {sync_message, Ref, M}
+ end || SPid <- SPids1],
case I rem 1000 of
0 -> rabbit_log:info(
"Synchronising ~s: ~p messages~n",
@@ -169,6 +173,15 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ,
[rabbit_misc:rs(Name), Total]),
ok.
+wait_for_credit() ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg),
+ wait_for_credit()
+ end;
+ false -> ok
+ end.
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ae069898d7..c1d2e8e44f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -270,7 +270,8 @@ handle_info({sync_start, Ref, MPid},
MRef = erlang:monitor(process, MPid),
MPid ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQS),
- noreply(sync_loop(Ref, MRef, State#state{backing_queue_state = BQS1}));
+ noreply(
+ sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}));
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -839,10 +840,10 @@ record_synchronised(#amqqueue { name = QName }) ->
end
end).
-sync_loop(Ref, MRef, State = #state{backing_queue = BQ,
+sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ,
backing_queue_state = BQS}) ->
receive
- {'DOWN', MRef, process, _MPid, _Reason} ->
+ {'DOWN', MRef, process, MPid, _Reason} ->
%% If the master dies half way we are not in the usual
%% half-synced state (with messages nearer the tail of the
%% queue; instead we have ones nearer the head. If we then
@@ -850,14 +851,18 @@ sync_loop(Ref, MRef, State = #state{backing_queue = BQ,
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.)
State#state{backing_queue_state = BQ:purge(BQS)};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ sync_loop(Ref, MRef, MPid, State);
{sync_complete, Ref} ->
erlang:demonitor(MRef),
set_delta(0, State);
{sync_message, Ref, M} ->
+ credit_flow:ack(MPid, ?CREDIT_DISC_BOUND),
%% TODO expiry needs fixing
Props = #message_properties{expiry = undefined,
needs_confirming = false,
delivered = true},
BQS1 = BQ:publish(M, Props, none, BQS),
- sync_loop(Ref, MRef, State#state{backing_queue_state = BQS1})
+ sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})
end.