summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-02 00:40:04 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-02 00:40:04 +0200
commit0e89449e39f1538c00bceae0dc340833dc1a0fab (patch)
treee7ca65fa8fb1b41f3453cb2522cbf9ca3b53d8e0 /src
parentd480e1db524df32680ccd66efd7cef56520199a9 (diff)
downloadrabbitmq-server-git-0e89449e39f1538c00bceae0dc340833dc1a0fab.tar.gz
refactors shared logic
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_sync.erl78
1 files changed, 42 insertions, 36 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f0abb250ee..eeb514a364 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -99,73 +99,79 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) ->
{ready, Syncer} -> EmitStats({syncing, 0}),
case maybe_batch(SyncBatchSize) of
true ->
- master_batch_go0(Args, BQ, BQS);
+ master_batch_go0(Args, SyncBatchSize,
+ BQ, BQS);
false ->
master_go0(Args, BQ, BQS)
end
end.
master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
- master_send(Msg, MsgProps, Unacked, Args, Acc)
- end, {0, time_compat:monotonic_time()}, BQS) of
- {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
- {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
- {_, BQS1} -> master_done(Args, BQS1)
- end.
+ FoldFun =
+ fun (Msg, MsgProps, Unacked, Acc) ->
+ master_send(Msg, MsgProps, Unacked, Args, Acc)
+ end,
+ FoldAcc = {0, time_compat:monotonic_time()},
+ bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
master_send(Msg, MsgProps, Unacked,
{Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
T = maybe_emit_stats(Last, I, EmitStats, Log),
HandleInfo({syncing, I}),
handle_set_maximum_since_use(),
- receive
- {'$gen_call', From,
- cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
- gen_server2:reply(From, ok),
- {stop, cancelled};
- {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
- {cont, {I + 1, T}};
- {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
- {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
- end.
-
-master_batch_go0(Args, BQ, BQS) ->
- Len = BQ:len(BQS),
- case BQ:fold(fun (Msg, MsgProps, Unacked, {Batch, I, Curr, T}) ->
- Batch1 = [{Msg, MsgProps, Unacked} | Batch],
- Curr1 = Curr + 1,
- Acc1 = {Batch1, I, Curr1, T},
- case maybe_master_batch_send(Len, Curr1, ?BATCH_SIZE) of
- true -> master_batch_send(Args, Acc1);
- false -> {cont, Acc1}
- end
- end, {[], 0, 0, time_compat:monotonic_time()}, BQS) of
+ SyncMsg = {msg, Ref, Msg, MsgProps, Unacked},
+ NewAcc = {I + 1, T},
+ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
+
+master_batch_go0(Args, BatchSize, BQ, BQS) ->
+ FoldFun =
+ fun (Msg, MsgProps, Unacked, Acc) ->
+ Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc),
+ case maybe_master_batch_send(Acc, BatchSize) of
+ true -> master_batch_send(Args, Acc1);
+ false -> {cont, Acc1}
+ end
+ end,
+ FoldAcc = {[], 0, {0, BQ:len(BQS)}, time_compat:monotonic_time()},
+ bq_fold(FoldFun, FoldAcc, Args, BQ, BQS).
+
+bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) ->
+ case BQ:fold(FoldFun, FoldAcc, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
end.
-maybe_master_batch_send(Len, Len, _BatchSize) ->
+append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) ->
+ {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}.
+
+%% Either send messages when we reach the last one in the queue or
+%% whenever we have accumulated BatchSize messages.
+maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) ->
true;
-maybe_master_batch_send(_Len, Curr, BatchSize)
+maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize)
when Curr rem BatchSize =:= 0 ->
true;
-maybe_master_batch_send(_Len, _Curr, _BatchSize) ->
+maybe_master_batch_send(_Acc, _BatchSize) ->
false.
master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
- {Batch, I, Curr, Last}) ->
+ {Batch, I, {Curr, Len}, Last}) ->
T = maybe_emit_stats(Last, I, EmitStats, Log),
HandleInfo({syncing, I}),
handle_set_maximum_since_use(),
+ SyncMsg = {msgs, Ref, Batch},
+ NewAcc = {[], I + length(Batch), {Curr, Len}, T},
+ master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent).
+
+master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) ->
receive
{'$gen_call', From,
cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
gen_server2:reply(From, ok),
{stop, cancelled};
- {next, Ref} -> Syncer ! {msgs, Ref, Batch},
- {cont, {[], I + length(Batch), Curr, T}};
+ {next, Ref} -> Syncer ! SyncMsg,
+ {cont, NewAcc};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.