diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 65 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 |
4 files changed, 51 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ad81ba032c..e5f1cb908b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -174,7 +174,8 @@ -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). -spec(sync_mirrors/1 :: (rabbit_types:amqqueue()) -> - 'ok' | error('queue_has_pending_acks') | error('queue_not_mirrored')). + 'ok' | rabbit_types:error('queue_has_pending_acks') + | rabbit_types:error('queue_not_mirrored')). -endif. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7b167a9a66..8c2fafa653 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1179,8 +1179,9 @@ handle_call(sync_mirrors, From, 0 -> {ok, #amqqueue{slave_pids = SPids, sync_slave_pids = SSPids}} = rabbit_amqqueue:lookup(Name), gen_server2:reply(From, ok), - noreply(rabbit_mirror_queue_master:sync_mirrors( - SPids -- SSPids, Name, BQS)); + noreply(State#q{backing_queue_state = + rabbit_mirror_queue_master:sync_mirrors( + SPids -- SSPids, Name, BQS)}); _ -> reply({error, queue_has_pending_acks}, State) end; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ea9e04fedf..542d724af1 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -144,44 +144,65 @@ sync_mirrors(SPids, Name, State = #state { backing_queue = BQ, %% We wait for a reply from the slaves so that we know they are in %% a receive block and will thus receive messages we send to them %% *without* those messages ending up in their gen_server2 pqueue. - SPids1 = [SPid1 || {SPid, MRef} <- SPidsMRefs, - SPid1 <- [receive - {'DOWN', MRef, process, SPid, _Reason} -> - dead; - {sync_ready, Ref, SPid} -> - SPid - end], - SPid1 =/= dead], - [erlang:demonitor(MRef) || {_, MRef} <- SPidsMRefs], - {Total, BQS1} = - BQ:fold(fun ({Msg, MsgProps}, I) -> - wait_for_credit(), + SPidsMRefs1 = sync_foreach(SPidsMRefs, Ref, fun sync_receive_ready/3), + {{Total, SPidsMRefs2}, BQS1} = + BQ:fold(fun ({Msg, MsgProps}, {I, SPMR}) -> + SPMR1 = wait_for_credit(SPMR, Ref), [begin credit_flow:send(SPid, ?CREDIT_DISC_BOUND), SPid ! {sync_message, Ref, Msg, MsgProps} - end || SPid <- SPids1], + end || {SPid, _} <- SPMR1], case I rem 1000 of 0 -> rabbit_log:info( "Synchronising ~s: ~p messages~n", [rabbit_misc:rs(Name), I]); _ -> ok end, - I + 1 - end, 0, BQS), - [SPid ! {sync_complete, Ref} || SPid <- SPids1], + {I + 1, SPMR1} + end, {0, SPidsMRefs1}, BQS), + sync_foreach(SPidsMRefs2, Ref, fun sync_receive_complete/3), rabbit_log:info("Synchronising ~s: ~p messages; complete~n", [rabbit_misc:rs(Name), Total]), State#state{backing_queue_state = BQS1}. -wait_for_credit() -> +wait_for_credit(SPidsMRefs, Ref) -> case credit_flow:blocked() of - true -> receive - {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - wait_for_credit() - end; - false -> ok + true -> wait_for_credit(sync_foreach(SPidsMRefs, Ref, + fun sync_receive_credit/3), Ref); + false -> SPidsMRefs end. +sync_foreach(SPidsMRefs, Ref, Fun) -> + [{SPid, MRef} || {SPid, MRef} <- SPidsMRefs, + SPid1 <- [Fun(SPid, MRef, Ref)], + SPid1 =/= dead]. + +sync_receive_ready(SPid, MRef, Ref) -> + receive + {sync_ready, Ref, SPid} -> SPid; + {'DOWN', MRef, _, SPid, _} -> dead + end. + +sync_receive_credit(SPid, MRef, Ref) -> + receive + {bump_credit, {SPid, _} = Msg} -> credit_flow:handle_bump_msg(Msg), + sync_receive_credit(SPid, MRef, Ref); + {'DOWN', MRef, _, SPid, _} -> credit_flow:peer_down(SPid), + dead + after 0 -> + SPid + end. + +sync_receive_complete(SPid, MRef, Ref) -> + SPid ! {sync_complete, Ref}, + receive + {sync_complete_ok, Ref, SPid} -> ok; + {'DOWN', MRef, _, SPid, _} -> ok + end, + erlang:demonitor(MRef, [flush]), + credit_flow:peer_down(SPid). + + terminate({shutdown, dropped} = Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f4130e0250..311c6ca658 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -851,12 +851,15 @@ sync_loop(Ref, MRef, MPid, State = #state{backing_queue = BQ, %% messages from it, we have a hole in the middle. So the %% only thing to do here is purge.) {_MsgCount, BQS1} = BQ:purge(BQS), + credit_flow:peer_down(MPid), State#state{backing_queue_state = BQS1}; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), sync_loop(Ref, MRef, MPid, State); {sync_complete, Ref} -> + MPid ! {sync_complete_ok, Ref, self()}, erlang:demonitor(MRef), + credit_flow:peer_down(MPid), %% We can only sync when there are no pending acks set_delta(0, State); {sync_message, Ref, Msg, Props0} -> |
