diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-29 00:42:42 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-09-29 00:42:42 +0200 |
| commit | 676e4137e88e08d0a845f5dd9f2ea78650d4eb00 (patch) | |
| tree | 7e134f6111fbf67a6390a8d34370f3ec47d67b8b | |
| parent | f30aaa32cf9f3cd43e9531636f323a726812428a (diff) | |
| download | rabbitmq-server-git-676e4137e88e08d0a845f5dd9f2ea78650d4eb00.tar.gz | |
implement mirror message sync in batches
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 111 |
1 files changed, 96 insertions, 15 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index b76422ee6b..0bca3b2cc4 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -22,6 +22,8 @@ -define(SYNC_PROGRESS_INTERVAL, 1000000). +-define(BATCH_SIZE, 20000). + %% There are three processes around, the master, the syncer and the %% slave(s). The syncer is an intermediary, linked to the master in %% order to make sure we do not mess with the master's credit flow or @@ -94,7 +96,12 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> {'EXIT', Syncer, normal} -> {already_synced, BQS}; {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}; {ready, Syncer} -> EmitStats({syncing, 0}), - master_go0(Args, BQ, BQS) + case maybe_batch() of + true -> + master_batch_go0(Args, BQ, BQS); + false -> + master_go0(Args, BQ, BQS) + end end. master_go0(Args, BQ, BQS) -> @@ -108,21 +115,9 @@ master_go0(Args, BQ, BQS) -> master_send(Msg, MsgProps, Unacked, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> - Interval = time_compat:convert_time_unit( - time_compat:monotonic_time() - Last, native, micro_seconds), - T = case Interval > ?SYNC_PROGRESS_INTERVAL of - true -> EmitStats({syncing, I}), - Log("~p messages", [I]), - time_compat:monotonic_time(); - false -> Last - end, + T = maybe_emit_stats(Last, I, EmitStats, Log), HandleInfo({syncing, I}), - receive - {'$gen_cast', {set_maximum_since_use, Age}} -> - ok = file_handle_cache:set_maximum_since_use(Age) - after 0 -> - ok - end, + handle_set_maximum_since_use(), receive {'$gen_call', From, cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), @@ -134,6 +129,46 @@ master_send(Msg, MsgProps, Unacked, {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} end. +master_batch_go0(Args, BQ, BQS) -> + Len = BQ:len(BQS), + case BQ:fold(fun (Msg, MsgProps, Unacked, {Batch, I, Curr, T}) -> + Batch1 = [{Msg, MsgProps, Unacked} | Batch], + Curr1 = Curr + 1, + Acc1 = {Batch1, I, Curr1, T}, + case maybe_master_batch_send(Len, Curr1, ?BATCH_SIZE) of + true -> master_batch_send(Args, Acc1); + false -> {cont, Acc1} + end + end, {[], 0, 0, time_compat:monotonic_time()}, BQS) of + {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; + {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; + {_, BQS1} -> master_done(Args, BQS1) + end. + +maybe_master_batch_send(Len, Len, _BatchSize) -> + true; +maybe_master_batch_send(_Len, Curr, BatchSize) + when Curr rem BatchSize =:= 0 -> + true; +maybe_master_batch_send(_Len, _Curr, _BatchSize) -> + false. + +master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, + {Batch, I, Curr, Last}) -> + T = maybe_emit_stats(Last, I, EmitStats, Log), + HandleInfo({syncing, I}), + handle_set_maximum_since_use(), + receive + {'$gen_call', From, + cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), + gen_server2:reply(From, ok), + {stop, cancelled}; + {next, Ref} -> Syncer ! {msgs, Ref, Batch}, + {cont, {[], I + length(Batch), Curr, T}}; + {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; + {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} + end. + master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) -> receive {next, Ref} -> stop_syncer(Syncer, {done, Ref}), @@ -149,6 +184,26 @@ stop_syncer(Syncer, Msg) -> after 0 -> ok end. +maybe_emit_stats(Last, I, EmitStats, Log) -> + Interval = time_compat:convert_time_unit( + time_compat:monotonic_time() - Last, native, micro_seconds), + case Interval > ?SYNC_PROGRESS_INTERVAL of + true -> EmitStats({syncing, I}), + Log("~p messages", [I]), + time_compat:monotonic_time(); + false -> Last + end. + +handle_set_maximum_since_use() -> + receive + {'$gen_cast', {set_maximum_since_use, Age}} -> + ok = file_handle_cache:set_maximum_since_use(Age) + after 0 -> + ok + end. + +maybe_batch() -> true. + %% Master %% --------------------------------------------------------------------------- %% Syncer @@ -191,6 +246,13 @@ syncer_loop(Ref, MPid, SPids) -> SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); + {msgs, Ref, Msgs} -> + SPids1 = wait_for_credit(SPids), + [begin + credit_flow:send(SPid), + SPid ! {sync_msgs, Ref, Msgs} + end || SPid <- SPids1], + syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do %% they interpret that as a failure, which is what we @@ -272,6 +334,25 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, {[{Msg#basic_message.id, AckTag} | MA], BQS2} end, slave_sync_loop(Args, {MA1, TRef, BQS1}); + {sync_msgs, Ref, Batch} -> + credit_flow:ack(Syncer), + %% we don't need to reverse BatchP1 and BatchPD1 since the + %% foldl took care of that. + {BatchP1, BatchPD1} = + lists:foldl(fun ({Msg, Props, false}, {BatchP, BatchPD}) -> + Props1 = Props#message_properties{needs_confirming = false}, + {[{Msg, Props1, true, none, noflow} | BatchP], BatchPD}; + ({Msg, Props, true}, {BatchP, BatchPD}) -> + Props1 = Props#message_properties{needs_confirming = false}, + {BatchP, [{Msg, Props1, none, noflow} | BatchPD]} + end, {[], []}, Batch), + BQS1 = BQ:batch_publish(BatchP1, BQS), + {AckTags, BQS2} = BQ:batch_publish_delivered(BatchPD1, BQS1), + MA1 = + lists:foldl(fun ({{Msg, _, _, _}, AckTag}, Acc) -> + [{Msg#basic_message.id, AckTag} | Acc] + end, MA, lists:zip(BatchPD1, AckTags)), + slave_sync_loop(Args, {MA1, TRef, BQS2}); {'EXIT', Parent, Reason} -> {stop, Reason, State}; %% If the master throws an exception |
