summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_sync.erl13
1 files changed, 8 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index b6d93c0d68..acf3e3e3c6 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -40,8 +40,8 @@
%% (from channel) || -- (spawns) --> || ||
%% || --------- sync_start (over GM) -------> ||
%% || || <--- sync_ready ---- ||
-%% || ----- msg* ---> || || }
-%% || <-- msg_ok* --- || || } loop
+%% || <--- next* ---- || || }
+%% || ---- msg* ----> || || } loop
%% || || ----- sync_msg* ---> || }
%% || || <---- (credit)* ---- || }
%% || ---- done ----> || ||
@@ -62,13 +62,15 @@ master_go(Syncer, Ref, Name, BQ, BQS) ->
master_send(SendArgs, I, Last, Msg, MsgProps)
end, {0, erlang:now()}, BQS),
Syncer ! {done, Ref},
+ receive
+ {next, Ref} -> ok
+ end,
case Acc of
{shutdown, Reason} -> {shutdown, Reason, BQS1};
_ -> {ok, BQS1}
end.
master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) ->
- Syncer ! {msg, Ref, Msg, MsgProps},
Acc = {I + 1,
case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
true -> rabbit_log:info("Synchronising ~s: ~p messages~n",
@@ -77,7 +79,8 @@ master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) ->
false -> Last
end},
receive
- {msg_ok, Ref} -> {cont, Acc};
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {cont, Acc};
{'EXIT', _Pid, Reason} -> {stop, {shutdown, Reason}}
end.
@@ -96,9 +99,9 @@ syncer(Ref, MPid, SPids) ->
unlink(MPid).
syncer_loop({Ref, MPid} = Args, SPidsMRefs) ->
+ MPid ! {next, Ref},
receive
{msg, Ref, Msg, MsgProps} ->
- MPid ! {msg_ok, Ref},
SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref),
[begin
credit_flow:send(SPid, ?CREDIT_DISC_BOUND),