diff options
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 49 |
1 files changed, 3 insertions, 46 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 8e63347094..534ef1afad 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -45,7 +45,7 @@ %% || <--- ready ---- || || %% || <--- next* ---- || || } %% || ---- msg* ----> || || } loop -%% || || ---- sync_msg* ----> || } +%% || || ---- sync_msgs* ---> || } %% || || <--- (credit)* ----- || } %% || <--- next ---- || || %% || ---- done ----> || || @@ -95,32 +95,10 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) -> {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; {ready, Syncer} -> EmitStats({syncing, 0}), - case maybe_batch(SyncBatchSize) of - true -> - master_batch_go0(Args, SyncBatchSize, - BQ, BQS); - false -> - master_go0(Args, BQ, BQS) - end + master_batch_go0(Args, SyncBatchSize, + BQ, BQS) end. -master_go0(Args, BQ, BQS) -> - FoldFun = - fun (Msg, MsgProps, Unacked, Acc) -> - master_send(Msg, MsgProps, Unacked, Args, Acc) - end, - FoldAcc = {0, time_compat:monotonic_time()}, - bq_fold(FoldFun, FoldAcc, Args, BQ, BQS). - -master_send(Msg, MsgProps, Unacked, - {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> - T = maybe_emit_stats(Last, I, EmitStats, Log), - HandleInfo({syncing, I}), - handle_set_maximum_since_use(), - SyncMsg = {msg, Ref, Msg, MsgProps, Unacked}, - NewAcc = {I + 1, T}, - master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent). - master_batch_go0(Args, BatchSize, BQ, BQS) -> FoldFun = fun (Msg, MsgProps, Unacked, Acc) -> @@ -207,11 +185,6 @@ handle_set_maximum_since_use() -> ok end. -maybe_batch(SyncBatchSize) when SyncBatchSize > 1 -> - true; -maybe_batch(_SyncBatchSize) -> - false. - %% Master %% --------------------------------------------------------------------------- %% Syncer @@ -247,10 +220,6 @@ await_slaves(Ref, SPids) -> syncer_loop(Ref, MPid, SPids) -> MPid ! {next, Ref}, receive - {msg, Ref, Msg, MsgProps, Unacked} -> - SPids1 = wait_for_credit(SPids), - broadcast(SPids1, {sync_msg, Ref, Msg, MsgProps, Unacked}), - syncer_loop(Ref, MPid, SPids1); {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), broadcast(SPids1, {sync_msgs, Ref, Msgs}), @@ -330,18 +299,6 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, update_ram_duration -> {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), slave_sync_loop(Args, {MA, TRef1, BQS1}); - {sync_msg, Ref, Msg, Props, Unacked} -> - credit_flow:ack(Syncer), - Props1 = Props#message_properties{needs_confirming = false}, - {MA1, BQS1} = - case Unacked of - false -> {MA, - BQ:publish(Msg, Props1, true, none, noflow, BQS)}; - true -> {AckTag, BQS2} = BQ:publish_delivered( - Msg, Props1, none, noflow, BQS), - {[{Msg#basic_message.id, AckTag} | MA], BQS2} - end, - slave_sync_loop(Args, {MA1, TRef, BQS1}); {sync_msgs, Ref, Batch} -> credit_flow:ack(Syncer), {MA1, BQS1} = process_batch(Batch, MA, BQ, BQS), |
