summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-10 23:26:07 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-10 23:26:07 +0200
commit192953364736aff37b00e17ae1952588a1cfc896 (patch)
treee06fca3ac0f0afc2ca84ca55159025b3e961d023 /src
parentb43b3b9ba8ea60651dbae86e81f2da4ddf9f67a4 (diff)
downloadrabbitmq-server-git-192953364736aff37b00e17ae1952588a1cfc896.tar.gz
refactors message sync'ing in batches
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_sync.erl81
1 files changed, 54 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f31979d9b6..8e63347094 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -344,33 +344,8 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
slave_sync_loop(Args, {MA1, TRef, BQS1});
{sync_msgs, Ref, Batch} ->
credit_flow:ack(Syncer),
- %% We need to partition the batch in messages that need to
- %% be batch_publish/2 and the ones that need to be
- %% batch_publish_delivered/2.
- %%
- %% The Batch has the messages in reverse order, but We
- %% don't need to reverse them since BatchP1 and BatchPD1
- %% will have the right order after the fold1.
- {BatchP1, BatchPD1} =
- lists:foldl(
- fun ({Msg, Props, false}, {BatchP, BatchPD}) ->
- Props1 = Props#message_properties{
- needs_confirming = false},
- {[{Msg, Props1, true} | BatchP],
- BatchPD};
- ({Msg, Props, true}, {BatchP, BatchPD}) ->
- Props1 = Props#message_properties{
- needs_confirming = false},
- {BatchP,
- [{Msg, Props1} | BatchPD]}
- end, {[], []}, Batch),
- BQS1 = BQ:batch_publish(BatchP1, none, noflow, BQS),
- {AckTags, BQS2} = BQ:batch_publish_delivered(BatchPD1, none, noflow, BQS1),
- MA1 =
- lists:foldl(fun ({{Msg, _, _, _}, AckTag}, Acc) ->
- [{Msg#basic_message.id, AckTag} | Acc]
- end, MA, lists:zip(BatchPD1, AckTags)),
- slave_sync_loop(Args, {MA1, TRef, BQS2});
+ {MA1, BQS1} = process_batch(Batch, MA, BQ, BQS),
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
{'EXIT', Parent, Reason} ->
{stop, Reason, State};
%% If the master throws an exception
@@ -378,3 +353,55 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
BQ:delete_and_terminate(Reason, BQS),
{stop, Reason, {[], TRef, undefined}}
end.
+
+%% We are partitioning messages by the Unacked element in the tuple.
+%% when unacked = true, then it's a publish_delivered message,
+%% otherwise it's a publish message.
+%%
+%% Note that we can't first partition the batch and then publish each
+%% part, since that would result in re-ordering messages, which we
+%% don't want to do.
+process_batch([], MA, _BQ, BQS) ->
+ {MA, BQS};
+process_batch(Batch, MA, BQ, BQS) ->
+ {_Msg, _MsgProps, Unacked} = hd(Batch),
+ process_batch(Batch, Unacked, [], MA, BQ, BQS).
+
+process_batch([{Msg, Props, true = Unacked} | Rest], true = Unacked,
+ Acc, MA, BQ, BQS) ->
+ %% publish_delivered messages don't need the IsDelivered flag,
+ %% therefore we just add {Msg, Props} to the accumulator.
+ process_batch(Rest, Unacked, [{Msg, props(Props)} | Acc],
+ MA, BQ, BQS);
+process_batch([{Msg, Props, false = Unacked} | Rest], false = Unacked,
+ Acc, MA, BQ, BQS) ->
+ %% publish messages needs the IsDelivered flag which is set to true
+ %% here.
+ process_batch(Rest, Unacked, [{Msg, props(Props), true} | Acc],
+ MA, BQ, BQS);
+process_batch(Batch, Unacked, Acc, MA, BQ, BQS) ->
+ {MA1, BQS1} = publish_batch(Unacked, lists:reverse(Acc), MA, BQ, BQS),
+ process_batch(Batch, MA1, BQ, BQS1).
+
+%% Unacked msgs are published via batch_publish.
+publish_batch(false, Batch, MA, BQ, BQS) ->
+ batch_publish(Batch, MA, BQ, BQS);
+%% Acked msgs are published via batch_publish_delivered.
+publish_batch(true, Batch, MA, BQ, BQS) ->
+ batch_publish_delivered(Batch, MA, BQ, BQS).
+
+
+batch_publish(Batch, MA, BQ, BQS) ->
+ BQS1 = BQ:batch_publish(Batch, none, noflow, BQS),
+ {MA, BQS1}.
+
+batch_publish_delivered(Batch, MA, BQ, BQS) ->
+ {AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS),
+ MA1 =
+ lists:foldl(fun ({{Msg, _}, AckTag}, MAs) ->
+ [{Msg#basic_message.id, AckTag} | MAs]
+ end, MA, lists:zip(Batch, AckTags)),
+ {MA1, BQS1}.
+
+props(Props) ->
+ Props#message_properties{needs_confirming = false}.