summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-29 00:42:42 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-29 00:42:42 +0200
commit676e4137e88e08d0a845f5dd9f2ea78650d4eb00 (patch)
tree7e134f6111fbf67a6390a8d34370f3ec47d67b8b
parentf30aaa32cf9f3cd43e9531636f323a726812428a (diff)
downloadrabbitmq-server-git-676e4137e88e08d0a845f5dd9f2ea78650d4eb00.tar.gz
implement mirror message sync in batches
-rw-r--r--src/rabbit_mirror_queue_sync.erl111
1 files changed, 96 insertions, 15 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index b76422ee6b..0bca3b2cc4 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -22,6 +22,8 @@
-define(SYNC_PROGRESS_INTERVAL, 1000000).
+-define(BATCH_SIZE, 20000).
+
%% There are three processes around, the master, the syncer and the
%% slave(s). The syncer is an intermediary, linked to the master in
%% order to make sure we do not mess with the master's credit flow or
@@ -94,7 +96,12 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
{'EXIT', Syncer, normal} -> {already_synced, BQS};
{'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
{ready, Syncer} -> EmitStats({syncing, 0}),
- master_go0(Args, BQ, BQS)
+ case maybe_batch() of
+ true ->
+ master_batch_go0(Args, BQ, BQS);
+ false ->
+ master_go0(Args, BQ, BQS)
+ end
end.
master_go0(Args, BQ, BQS) ->
@@ -108,21 +115,9 @@ master_go0(Args, BQ, BQS) ->
master_send(Msg, MsgProps, Unacked,
{Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
- Interval = time_compat:convert_time_unit(
- time_compat:monotonic_time() - Last, native, micro_seconds),
- T = case Interval > ?SYNC_PROGRESS_INTERVAL of
- true -> EmitStats({syncing, I}),
- Log("~p messages", [I]),
- time_compat:monotonic_time();
- false -> Last
- end,
+ T = maybe_emit_stats(Last, I, EmitStats, Log),
HandleInfo({syncing, I}),
- receive
- {'$gen_cast', {set_maximum_since_use, Age}} ->
- ok = file_handle_cache:set_maximum_since_use(Age)
- after 0 ->
- ok
- end,
+ handle_set_maximum_since_use(),
receive
{'$gen_call', From,
cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
@@ -134,6 +129,46 @@ master_send(Msg, MsgProps, Unacked,
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
end.
+master_batch_go0(Args, BQ, BQS) ->
+ Len = BQ:len(BQS),
+ case BQ:fold(fun (Msg, MsgProps, Unacked, {Batch, I, Curr, T}) ->
+ Batch1 = [{Msg, MsgProps, Unacked} | Batch],
+ Curr1 = Curr + 1,
+ Acc1 = {Batch1, I, Curr1, T},
+ case maybe_master_batch_send(Len, Curr1, ?BATCH_SIZE) of
+ true -> master_batch_send(Args, Acc1);
+ false -> {cont, Acc1}
+ end
+ end, {[], 0, 0, time_compat:monotonic_time()}, BQS) of
+ {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
+ {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
+ {_, BQS1} -> master_done(Args, BQS1)
+ end.
+
+maybe_master_batch_send(Len, Len, _BatchSize) ->
+ true;
+maybe_master_batch_send(_Len, Curr, BatchSize)
+ when Curr rem BatchSize =:= 0 ->
+ true;
+maybe_master_batch_send(_Len, _Curr, _BatchSize) ->
+ false.
+
+master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
+ {Batch, I, Curr, Last}) ->
+ T = maybe_emit_stats(Last, I, EmitStats, Log),
+ HandleInfo({syncing, I}),
+ handle_set_maximum_since_use(),
+ receive
+ {'$gen_call', From,
+ cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
+ gen_server2:reply(From, ok),
+ {stop, cancelled};
+ {next, Ref} -> Syncer ! {msgs, Ref, Batch},
+ {cont, {[], I + length(Batch), Curr, T}};
+ {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
+ {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
+ end.
+
master_done({Syncer, Ref, _Log, _HandleInfo, _EmitStats, Parent}, BQS) ->
receive
{next, Ref} -> stop_syncer(Syncer, {done, Ref}),
@@ -149,6 +184,26 @@ stop_syncer(Syncer, Msg) ->
after 0 -> ok
end.
+maybe_emit_stats(Last, I, EmitStats, Log) ->
+ Interval = time_compat:convert_time_unit(
+ time_compat:monotonic_time() - Last, native, micro_seconds),
+ case Interval > ?SYNC_PROGRESS_INTERVAL of
+ true -> EmitStats({syncing, I}),
+ Log("~p messages", [I]),
+ time_compat:monotonic_time();
+ false -> Last
+ end.
+
+handle_set_maximum_since_use() ->
+ receive
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age)
+ after 0 ->
+ ok
+ end.
+
+maybe_batch() -> true.
+
%% Master
%% ---------------------------------------------------------------------------
%% Syncer
@@ -191,6 +246,13 @@ syncer_loop(Ref, MPid, SPids) ->
SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
+ {msgs, Ref, Msgs} ->
+ SPids1 = wait_for_credit(SPids),
+ [begin
+ credit_flow:send(SPid),
+ SPid ! {sync_msgs, Ref, Msgs}
+ end || SPid <- SPids1],
+ syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
%% they interpret that as a failure, which is what we
@@ -272,6 +334,25 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
{[{Msg#basic_message.id, AckTag} | MA], BQS2}
end,
slave_sync_loop(Args, {MA1, TRef, BQS1});
+ {sync_msgs, Ref, Batch} ->
+ credit_flow:ack(Syncer),
+ %% we don't need to reverse BatchP1 and BatchPD1 since the
+ %% foldl took care of that.
+ {BatchP1, BatchPD1} =
+ lists:foldl(fun ({Msg, Props, false}, {BatchP, BatchPD}) ->
+ Props1 = Props#message_properties{needs_confirming = false},
+ {[{Msg, Props1, true, none, noflow} | BatchP], BatchPD};
+ ({Msg, Props, true}, {BatchP, BatchPD}) ->
+ Props1 = Props#message_properties{needs_confirming = false},
+ {BatchP, [{Msg, Props1, none, noflow} | BatchPD]}
+ end, {[], []}, Batch),
+ BQS1 = BQ:batch_publish(BatchP1, BQS),
+ {AckTags, BQS2} = BQ:batch_publish_delivered(BatchPD1, BQS1),
+ MA1 =
+ lists:foldl(fun ({{Msg, _, _, _}, AckTag}, Acc) ->
+ [{Msg#basic_message.id, AckTag} | Acc]
+ end, MA, lists:zip(BatchPD1, AckTags)),
+ slave_sync_loop(Args, {MA1, TRef, BQS2});
{'EXIT', Parent, Reason} ->
{stop, Reason, State};
%% If the master throws an exception