summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-02 00:46:29 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-02 00:46:29 +0200
commite8fe201e57a045ecc9082ebbd492eab5fa99984e (patch)
treed46cf1e151f74e7f7df39c901c11c9365b4dc13e /src
parent0e89449e39f1538c00bceae0dc340833dc1a0fab (diff)
downloadrabbitmq-server-git-e8fe201e57a045ecc9082ebbd492eab5fa99984e.tar.gz
cosmetics
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_sync.erl53
1 files changed, 28 insertions, 25 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index eeb514a364..3fbe03fa02 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -135,15 +135,14 @@ master_batch_go0(Args, BatchSize, BQ, BQS) ->
FoldAcc = {[], 0, {0, BQ:len(BQS)}, time_compat:monotonic_time()},
bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
-bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) ->
- case BQ:fold(FoldFun, FoldAcc, BQS) of
- {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
- {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
- {_, BQS1} -> master_done(Args, BQS1)
- end.
-
-append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) ->
- {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}.
+master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
+ {Batch, I, {Curr, Len}, Last}) ->
+ T = maybe_emit_stats(Last, I, EmitStats, Log),
+ HandleInfo({syncing, I}),
+ handle_set_maximum_since_use(),
+ SyncMsg = {msgs, Ref, Batch},
+ NewAcc = {[], I + length(Batch), {Curr, Len}, T},
+ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
%% Either send messages when we reach the last one in the queue or
%% whenever we have accumulated BatchSize messages.
@@ -155,14 +154,15 @@ maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize)
maybe_master_batch_send(_Acc, _BatchSize) ->
false.
-master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
- {Batch, I, {Curr, Len}, Last}) ->
- T = maybe_emit_stats(Last, I, EmitStats, Log),
- HandleInfo({syncing, I}),
- handle_set_maximum_since_use(),
- SyncMsg = {msgs, Ref, Batch},
- NewAcc = {[], I + length(Batch), {Curr, Len}, T},
- master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
+bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) ->
+ case BQ:fold(FoldFun, FoldAcc, BQS) of
+ {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
+ {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
+ {_, BQS1} -> master_done(Args, BQS1)
+ end.
+
+append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) ->
+ {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}.
master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
receive
@@ -347,15 +347,18 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
{sync_msgs, Ref, Batch} ->
credit_flow:ack(Syncer),
%% we don't need to reverse BatchP1 and BatchPD1 since the
- %% foldl took care of that.
+ %% foldl takes care of that.
{BatchP1, BatchPD1} =
- lists:foldl(fun ({Msg, Props, false}, {BatchP, BatchPD}) ->
- Props1 = Props#message_properties{needs_confirming = false},
- {[{Msg, Props1, true, none, noflow} | BatchP], BatchPD};
- ({Msg, Props, true}, {BatchP, BatchPD}) ->
- Props1 = Props#message_properties{needs_confirming = false},
- {BatchP, [{Msg, Props1, none, noflow} | BatchPD]}
- end, {[], []}, Batch),
+ lists:foldl(
+ fun ({Msg, Props, false}, {BatchP, BatchPD}) ->
+ Props1 = Props#message_properties{
+ needs_confirming = false},
+ {[{Msg, Props1, true, none, noflow} | BatchP], BatchPD};
+ ({Msg, Props, true}, {BatchP, BatchPD}) ->
+ Props1 = Props#message_properties{
+ needs_confirming = false},
+ {BatchP, [{Msg, Props1, none, noflow} | BatchPD]}
+ end, {[], []}, Batch),
BQS1 = BQ:batch_publish(BatchP1, BQS),
{AckTags, BQS2} = BQ:batch_publish_delivered(BatchPD1, BQS1),
MA1 =