summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_sync.erl49
1 files changed, 3 insertions, 46 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 8e63347094..534ef1afad 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -45,7 +45,7 @@
%% || <--- ready ---- || ||
%% || <--- next* ---- || || }
%% || ---- msg* ----> || || } loop
-%% || || ---- sync_msg* ----> || }
+%% || || ---- sync_msgs* ---> || }
%% || || <--- (credit)* ----- || }
%% || <--- next ---- || ||
%% || ---- done ----> || ||
@@ -95,32 +95,10 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
{ready, Syncer} -> EmitStats({syncing, 0}),
- case maybe_batch(SyncBatchSize) of
- true ->
- master_batch_go0(Args, SyncBatchSize,
- BQ, BQS);
- false ->
- master_go0(Args, BQ, BQS)
- end
+ master_batch_go0(Args, SyncBatchSize,
+ BQ, BQS)
end.
-master_go0(Args, BQ, BQS) ->
- FoldFun =
- fun (Msg, MsgProps, Unacked, Acc) ->
- master_send(Msg, MsgProps, Unacked, Args, Acc)
- end,
- FoldAcc = {0, time_compat:monotonic_time()},
- bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
-
-master_send(Msg, MsgProps, Unacked,
- {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
- T = maybe_emit_stats(Last, I, EmitStats, Log),
- HandleInfo({syncing, I}),
- handle_set_maximum_since_use(),
- SyncMsg = {msg, Ref, Msg, MsgProps, Unacked},
- NewAcc = {I + 1, T},
- master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
-
master_batch_go0(Args, BatchSize, BQ, BQS) ->
FoldFun =
fun (Msg, MsgProps, Unacked, Acc) ->
@@ -207,11 +185,6 @@ handle_set_maximum_since_use() ->
ok
end.
-maybe_batch(SyncBatchSize) when SyncBatchSize > 1 ->
- true;
-maybe_batch(_SyncBatchSize) ->
- false.
-
%% Master
%% ---------------------------------------------------------------------------
%% Syncer
@@ -247,10 +220,6 @@ await_slaves(Ref, SPids) ->
syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
- {msg, Ref, Msg, MsgProps, Unacked} ->
- SPids1 = wait_for_credit(SPids),
- broadcast(SPids1, {sync_msg, Ref, Msg, MsgProps, Unacked}),
- syncer_loop(Ref, MPid, SPids1);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
broadcast(SPids1, {sync_msgs, Ref, Msgs}),
@@ -330,18 +299,6 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
slave_sync_loop(Args, {MA, TRef1, BQS1});
- {sync_msg, Ref, Msg, Props, Unacked} ->
- credit_flow:ack(Syncer),
- Props1 = Props#message_properties{needs_confirming = false},
- {MA1, BQS1} =
- case Unacked of
- false -> {MA,
- BQ:publish(Msg, Props1, true, none, noflow, BQS)};
- true -> {AckTag, BQS2} = BQ:publish_delivered(
- Msg, Props1, none, noflow, BQS),
- {[{Msg#basic_message.id, AckTag} | MA], BQS2}
- end,
- slave_sync_loop(Args, {MA1, TRef, BQS1});
{sync_msgs, Ref, Batch} ->
credit_flow:ack(Syncer),
{MA1, BQS1} = process_batch(Batch, MA, BQ, BQS),