diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-02 00:40:04 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-02 00:40:04 +0200 |
| commit | 0e89449e39f1538c00bceae0dc340833dc1a0fab (patch) | |
| tree | e7ca65fa8fb1b41f3453cb2522cbf9ca3b53d8e0 /src | |
| parent | d480e1db524df32680ccd66efd7cef56520199a9 (diff) | |
| download | rabbitmq-server-git-0e89449e39f1538c00bceae0dc340833dc1a0fab.tar.gz | |
refactors shared logic
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f0abb250ee..eeb514a364 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -99,73 +99,79 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) -> {ready, Syncer} -> EmitStats({syncing, 0}), case maybe_batch(SyncBatchSize) of true -> - master_batch_go0(Args, BQ, BQS); + master_batch_go0(Args, SyncBatchSize, + BQ, BQS); false -> master_go0(Args, BQ, BQS) end end. master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) -> - master_send(Msg, MsgProps, Unacked, Args, Acc) - end, {0, time_compat:monotonic_time()}, BQS) of - {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; - {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; - {_, BQS1} -> master_done(Args, BQS1) - end. + FoldFun = + fun (Msg, MsgProps, Unacked, Acc) -> + master_send(Msg, MsgProps, Unacked, Args, Acc) + end, + FoldAcc = {0, time_compat:monotonic_time()}, + bq_fold(FoldFun, FoldAcc, Args, BQ, BQS). master_send(Msg, MsgProps, Unacked, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> T = maybe_emit_stats(Last, I, EmitStats, Log), HandleInfo({syncing, I}), handle_set_maximum_since_use(), - receive - {'$gen_call', From, - cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), - gen_server2:reply(From, ok), - {stop, cancelled}; - {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked}, - {cont, {I + 1, T}}; - {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; - {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} - end. - -master_batch_go0(Args, BQ, BQS) -> - Len = BQ:len(BQS), - case BQ:fold(fun (Msg, MsgProps, Unacked, {Batch, I, Curr, T}) -> - Batch1 = [{Msg, MsgProps, Unacked} | Batch], - Curr1 = Curr + 1, - Acc1 = {Batch1, I, Curr1, T}, - case maybe_master_batch_send(Len, Curr1, ?BATCH_SIZE) of - true -> master_batch_send(Args, Acc1); - false -> {cont, Acc1} - end - end, {[], 0, 0, time_compat:monotonic_time()}, BQS) of + SyncMsg = {msg, Ref, Msg, MsgProps, Unacked}, + NewAcc = {I + 1, T}, + master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent). + +master_batch_go0(Args, BatchSize, BQ, BQS) -> + FoldFun = + fun (Msg, MsgProps, Unacked, Acc) -> + Acc1 = append_to_acc(Msg, MsgProps, Unacked, Acc), + case maybe_master_batch_send(Acc, BatchSize) of + true -> master_batch_send(Args, Acc1); + false -> {cont, Acc1} + end + end, + FoldAcc = {[], 0, {0, BQ:len(BQS)}, time_compat:monotonic_time()}, + bq_fold(FoldFun, FoldAcc, Args, BQ, BQS). + +bq_fold(FoldFun, FoldAcc, Args, BQ, BQS) -> + case BQ:fold(FoldFun, FoldAcc, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) end. -maybe_master_batch_send(Len, Len, _BatchSize) -> +append_to_acc(Msg, MsgProps, Unacked, {Batch, I, {Curr, Len}, T}) -> + {[{Msg, MsgProps, Unacked} | Batch], I, {Curr + 1, Len}, T}. + +%% Either send messages when we reach the last one in the queue or +%% whenever we have accumulated BatchSize messages. +maybe_master_batch_send({_, _, {Len, Len}, _}, _BatchSize) -> true; -maybe_master_batch_send(_Len, Curr, BatchSize) +maybe_master_batch_send({_, _, {Curr, _Len}, _}, BatchSize) when Curr rem BatchSize =:= 0 -> true; -maybe_master_batch_send(_Len, _Curr, _BatchSize) -> +maybe_master_batch_send(_Acc, _BatchSize) -> false. master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, - {Batch, I, Curr, Last}) -> + {Batch, I, {Curr, Len}, Last}) -> T = maybe_emit_stats(Last, I, EmitStats, Log), HandleInfo({syncing, I}), handle_set_maximum_since_use(), + SyncMsg = {msgs, Ref, Batch}, + NewAcc = {[], I + length(Batch), {Curr, Len}, T}, + master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent). + +master_send_receive(SyncMsg, NewAcc, Syncer, Ref, Parent) -> receive {'$gen_call', From, cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), gen_server2:reply(From, ok), {stop, cancelled}; - {next, Ref} -> Syncer ! {msgs, Ref, Batch}, - {cont, {[], I + length(Batch), Curr, T}}; + {next, Ref} -> Syncer ! SyncMsg, + {cont, NewAcc}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. |
