diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-02 00:46:29 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-02 00:46:29 +0200 |
| commit | e8fe201e57a045ecc9082ebbd492eab5fa99984e (patch) | |
| tree | d46cf1e151f74e7f7df39c901c11c9365b4dc13e /src | |
| parent | 0e89449e39f1538c00bceae0dc340833dc1a0fab (diff) | |
| download | rabbitmq-server-git-e8fe201e57a045ecc9082ebbd492eab5fa99984e.tar.gz | |
cosmetics
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index eeb514a364..3fbe03fa02 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -135,15 +135,14 @@ master_batch_go0(Args, BatchSize, BQ, BQS) -> FoldAcc = {[], 0, {0, BQ:len(BQS)}, time_compat:monotonic_time()}, bq_fold(FoldFun, FoldAcc, Args, BQ, BQS). -bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) -> - case BQ:fold(FoldFun, FoldAcc, BQS) of - {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; - {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; - {_, BQS1} -> master_done(Args, BQS1) - end. - -append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) -> - {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}. +master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, + {Batch, I, {Curr, Len}, Last}) -> + T = maybe_emit_stats(Last, I, EmitStats, Log), + HandleInfo({syncing, I}), + handle_set_maximum_since_use(), + SyncMsg = {msgs, Ref, Batch}, + NewAcc = {[], I + length(Batch), {Curr, Len}, T}, + master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent). %% Either send messages when we reach the last one in the queue or %% whenever we have accumulated BatchSize messages. @@ -155,14 +154,15 @@ maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize) maybe_master_batch_send(_Acc, _BatchSize) -> false. -master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, - {Batch, I, {Curr, Len}, Last}) -> - T = maybe_emit_stats(Last, I, EmitStats, Log), - HandleInfo({syncing, I}), - handle_set_maximum_since_use(), - SyncMsg = {msgs, Ref, Batch}, - NewAcc = {[], I + length(Batch), {Curr, Len}, T}, - master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent). +bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) -> + case BQ:fold(FoldFun, FoldAcc, BQS) of + {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; + {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; + {_, BQS1} -> master_done(Args, BQS1) + end. + +append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) -> + {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}. master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) -> receive @@ -347,15 +347,18 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, {sync_msgs, Ref, Batch} -> credit_flow:ack(Syncer), %% we don't need to reverse BatchP1 and BatchPD1 since the - %% foldl took care of that. + %% foldl takes care of that. {BatchP1, BatchPD1} = - lists:foldl(fun ({Msg, Props, false}, {BatchP, BatchPD}) -> - Props1 = Props#message_properties{needs_confirming = false}, - {[{Msg, Props1, true, none, noflow} | BatchP], BatchPD}; - ({Msg, Props, true}, {BatchP, BatchPD}) -> - Props1 = Props#message_properties{needs_confirming = false}, - {BatchP, [{Msg, Props1, none, noflow} | BatchPD]} - end, {[], []}, Batch), + lists:foldl( + fun ({Msg, Props, false}, {BatchP, BatchPD}) -> + Props1 = Props#message_properties{ + needs_confirming = false}, + {[{Msg, Props1, true, none, noflow} | BatchP], BatchPD}; + ({Msg, Props, true}, {BatchP, BatchPD}) -> + Props1 = Props#message_properties{ + needs_confirming = false}, + {BatchP, [{Msg, Props1, none, noflow} | BatchPD]} + end, {[], []}, Batch), BQS1 = BQ:batch_publish(BatchP1, BQS), {AckTags, BQS2} = BQ:batch_publish_delivered(BatchPD1, BQS1), MA1 = |
