diff options
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index b6d93c0d68..acf3e3e3c6 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -40,8 +40,8 @@ %% (from channel) || -- (spawns) --> || || %% || --------- sync_start (over GM) -------> || %% || || <--- sync_ready ---- || -%% || ----- msg* ---> || || } -%% || <-- msg_ok* --- || || } loop +%% || <--- next* ---- || || } +%% || ---- msg* ----> || || } loop %% || || ----- sync_msg* ---> || } %% || || <---- (credit)* ---- || } %% || ---- done ----> || || @@ -62,13 +62,15 @@ master_go(Syncer, Ref, Name, BQ, BQS) -> master_send(SendArgs, I, Last, Msg, MsgProps) end, {0, erlang:now()}, BQS), Syncer ! {done, Ref}, + receive + {next, Ref} -> ok + end, case Acc of {shutdown, Reason} -> {shutdown, Reason, BQS1}; _ -> {ok, BQS1} end. master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) -> - Syncer ! {msg, Ref, Msg, MsgProps}, Acc = {I + 1, case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of true -> rabbit_log:info("Synchronising ~s: ~p messages~n", @@ -77,7 +79,8 @@ master_send({Syncer, Ref, Name}, I, Last, Msg, MsgProps) -> false -> Last end}, receive - {msg_ok, Ref} -> {cont, Acc}; + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {cont, Acc}; {'EXIT', _Pid, Reason} -> {stop, {shutdown, Reason}} end. @@ -96,9 +99,9 @@ syncer(Ref, MPid, SPids) -> unlink(MPid). syncer_loop({Ref, MPid} = Args, SPidsMRefs) -> + MPid ! {next, Ref}, receive {msg, Ref, Msg, MsgProps} -> - MPid ! {msg_ok, Ref}, SPidsMRefs1 = wait_for_credit(SPidsMRefs, Ref), [begin credit_flow:send(SPid, ?CREDIT_DISC_BOUND), |
