diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 |
2 files changed, 23 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 05075d0f2b..dadaef1de7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -155,7 +155,11 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ, [erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs], {Total, _BQS} = BQ:fold(fun (M = #basic_message{}, I) -> - [SPid ! {sync_message, Ref, M} || SPid <- SPids1], + wait_for_credit(), + [begin + credit_flow:send(SPid, ?CREDIT_DISC_BOUND), + SPid ! {sync_message, Ref, M} + end || SPid <- SPids1], case I rem 1000 of 0 -> rabbit_log:info( "Synchronising ~s: ~p messages~n", @@ -169,6 +173,15 @@ sync_mirrors(SPids, Name, #state { backing_queue = BQ, [rabbit_misc:rs(Name), Total]), ok. +wait_for_credit() -> + case credit_flow:blocked() of + true -> receive + {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), + wait_for_credit() + end; + false -> ok + end. + terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ae069898d7..c1d2e8e44f 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -270,7 +270,8 @@ handle_info({sync_start, Ref, MPid}, MRef = erlang:monitor(process, MPid), MPid ! {sync_ready, Ref, self()}, {_MsgCount, BQS1} = BQ:purge(BQS), - noreply(sync_loop(Ref, MRef, State#state{backing_queue_state = BQS1})); + noreply( + sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1})); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -839,10 +840,10 @@ record_synchronised(#amqqueue { name = QName }) -> end end). -sync_loop(Ref, MRef, State = #state{backing_queue = BQ, +sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> receive - {'DOWN', MRef, process, _MPid, _Reason} -> + {'DOWN', MRef, process, MPid, _Reason} -> %% If the master dies half way we are not in the usual %% half-synced state (with messages nearer the tail of the %% queue; instead we have ones nearer the head. If we then @@ -850,14 +851,18 @@ sync_loop(Ref, MRef, State = #state{backing_queue = BQ, %% messages from it, we have a hole in the middle. So the %% only thing to do here is purge.) State#state{backing_queue_state = BQ:purge(BQS)}; + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + sync_loop(Ref, MRef, MPid, State); {sync_complete, Ref} -> erlang:demonitor(MRef), set_delta(0, State); {sync_message, Ref, M} -> + credit_flow:ack(MPid, ?CREDIT_DISC_BOUND), %% TODO expiry needs fixing Props = #message_properties{expiry = undefined, needs_confirming = false, delivered = true}, BQS1 = BQ:publish(M, Props, none, BQS), - sync_loop(Ref, MRef, State#state{backing_queue_state = BQS1}) + sync_loop(Ref, MRef, MPid, State#state{backing_queue_state = BQS1}) end. |
