diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-10 23:26:07 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-10 23:26:07 +0200 |
| commit | 192953364736aff37b00e17ae1952588a1cfc896 (patch) | |
| tree | e06fca3ac0f0afc2ca84ca55159025b3e961d023 /src | |
| parent | b43b3b9ba8ea60651dbae86e81f2da4ddf9f67a4 (diff) | |
| download | rabbitmq-server-git-192953364736aff37b00e17ae1952588a1cfc896.tar.gz | |
refactors message sync'ing in batches
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 81 |
1 files changed, 54 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f31979d9b6..8e63347094 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -344,33 +344,8 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, slave_sync_loop(Args, {MA1, TRef, BQS1}); {sync_msgs, Ref, Batch} -> credit_flow:ack(Syncer), - %% We need to partition the batch in messages that need to - %% be batch_publish/2 and the ones that need to be - %% batch_publish_delivered/2. - %% - %% The Batch has the messages in reverse order, but We - %% don't need to reverse them since BatchP1 and BatchPD1 - %% will have the right order after the fold1. - {BatchP1, BatchPD1} = - lists:foldl( - fun ({Msg, Props, false}, {BatchP, BatchPD}) -> - Props1 = Props#message_properties{ - needs_confirming = false}, - {[{Msg, Props1, true} | BatchP], - BatchPD}; - ({Msg, Props, true}, {BatchP, BatchPD}) -> - Props1 = Props#message_properties{ - needs_confirming = false}, - {BatchP, - [{Msg, Props1} | BatchPD]} - end, {[], []}, Batch), - BQS1 = BQ:batch_publish(BatchP1, none, noflow, BQS), - {AckTags, BQS2} = BQ:batch_publish_delivered(BatchPD1, none, noflow, BQS1), - MA1 = - lists:foldl(fun ({{Msg, _, _, _}, AckTag}, Acc) -> - [{Msg#basic_message.id, AckTag} | Acc] - end, MA, lists:zip(BatchPD1, AckTags)), - slave_sync_loop(Args, {MA1, TRef, BQS2}); + {MA1, BQS1} = process_batch(Batch, MA, BQ, BQS), + slave_sync_loop(Args, {MA1, TRef, BQS1}); {'EXIT', Parent, Reason} -> {stop, Reason, State}; %% If the master throws an exception @@ -378,3 +353,55 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, BQ:delete_and_terminate(Reason, BQS), {stop, Reason, {[], TRef, undefined}} end. + +%% We are partitioning messages by the Unacked element in the tuple. +%% when unacked = true, then it's a publish_delivered message, +%% otherwise it's a publish message. +%% +%% Note that we can't first partition the batch and then publish each +%% part, since that would result in re-ordering messages, which we +%% don't want to do. +process_batch([], MA, _BQ, BQS) -> + {MA, BQS}; +process_batch(Batch, MA, BQ, BQS) -> + {_Msg, _MsgProps, Unacked} = hd(Batch), + process_batch(Batch, Unacked, [], MA, BQ, BQS). + +process_batch([{Msg, Props, true = Unacked} | Rest], true = Unacked, + Acc, MA, BQ, BQS) -> + %% publish_delivered messages don't need the IsDelivered flag, + %% therefore we just add {Msg, Props} to the accumulator. + process_batch(Rest, Unacked, [{Msg, props(Props)} | Acc], + MA, BQ, BQS); +process_batch([{Msg, Props, false = Unacked} | Rest], false = Unacked, + Acc, MA, BQ, BQS) -> + %% publish messages needs the IsDelivered flag which is set to true + %% here. + process_batch(Rest, Unacked, [{Msg, props(Props), true} | Acc], + MA, BQ, BQS); +process_batch(Batch, Unacked, Acc, MA, BQ, BQS) -> + {MA1, BQS1} = publish_batch(Unacked, lists:reverse(Acc), MA, BQ, BQS), + process_batch(Batch, MA1, BQ, BQS1). + +%% Unacked msgs are published via batch_publish. +publish_batch(false, Batch, MA, BQ, BQS) -> + batch_publish(Batch, MA, BQ, BQS); +%% Acked msgs are published via batch_publish_delivered. +publish_batch(true, Batch, MA, BQ, BQS) -> + batch_publish_delivered(Batch, MA, BQ, BQS). + + +batch_publish(Batch, MA, BQ, BQS) -> + BQS1 = BQ:batch_publish(Batch, none, noflow, BQS), + {MA, BQS1}. + +batch_publish_delivered(Batch, MA, BQ, BQS) -> + {AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS), + MA1 = + lists:foldl(fun ({{Msg, _}, AckTag}, MAs) -> + [{Msg#basic_message.id, AckTag} | MAs] + end, MA, lists:zip(Batch, AckTags)), + {MA1, BQS1}. + +props(Props) -> + Props#message_properties{needs_confirming = false}. |
