diff options
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index b39f7a35dd..a9aea96682 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -60,24 +60,26 @@ master_prepare(Ref, Log, SPids) -> spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end). master_go(Syncer, Ref, Log, InfoPull, InfoPush, BQ, BQS) -> - Args = {Syncer, Ref, Log, InfoPush, rabbit_misc:get_parent()}, + Args = {Syncer, Ref, Log, rabbit_misc:get_parent()}, receive {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; - {ready, Syncer} -> master_go0(InfoPull, Args, BQ, BQS) + {ready, Syncer} -> master_go0( + InfoPull, InfoPush, Args, BQ, BQS) end. -master_go0(InfoPull, Args, BQ, BQS) -> +master_go0(InfoPull, InfoPush, Args, BQ, BQS) -> + InfoPush({syncing, 0}), case BQ:fold(fun (Msg, MsgProps, {I, Last}) -> InfoPull({syncing, I}), - master_send(Args, I, Last, Msg, MsgProps) + master_send(Args, InfoPush, I, Last, Msg, MsgProps) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) end. -master_send({Syncer, Ref, Log, InfoPush, Parent}, I, Last, Msg, MsgProps) -> +master_send({Syncer, Ref, Log, Parent}, InfoPush, I, Last, Msg, MsgProps) -> Acc = {I + 1, case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of true -> InfoPush({syncing, I}), @@ -106,7 +108,7 @@ master_send({Syncer, Ref, Log, InfoPush, Parent}, I, Last, Msg, MsgProps) -> {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. -master_done({Syncer, Ref, _Log, _InfoPush, Parent}, BQS) -> +master_done({Syncer, Ref, _Log, Parent}, BQS) -> receive {next, Ref} -> unlink(Syncer), Syncer ! {done, Ref}, |
