diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-11-18 20:06:40 +0100 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-11-18 20:06:40 +0100 |
| commit | 6fcfd70d7e70134fa007a448240d10dbfda8c344 (patch) | |
| tree | 26945033c5810f8abc7044c322a02d3554560de7 | |
| parent | 4b09cda42c32dd21a6880fdc52cd9da663c77432 (diff) | |
| download | rabbitmq-server-git-6fcfd70d7e70134fa007a448240d10dbfda8c344.tar.gz | |
refactor handling of acks after batch_publish_delivered
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 7 |
4 files changed, 34 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e63ee107b0..4556f72e78 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -23,7 +23,8 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, + zip_msgs_and_acks/4]). -export([start/1, stop/0, delete_crashed/1]). @@ -492,6 +493,11 @@ set_queue_mode(Mode, State = #state { gm = GM, BQS1 = BQ:set_queue_mode(Mode, BQS), State #state { backing_queue_state = BQS1 }. +zip_msgs_and_acks(Msgs, AckTags, Accumulator, + #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS). + %% --------------------------------------------------------------------------- %% Other exported functions %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 62fc718f79..e9717743dc 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -352,46 +352,10 @@ batch_publish(Batch, MA, BQ, BQS) -> BQS1 = BQ:batch_publish(Batch, none, noflow, BQS), {MA, BQS1}. -%% TODO -%% -%% The case clause in this function assumes that we are either dealing -%% with a backing_queue that returns acktags as integers, or a -%% priority queue. -%% A possible fix to this would be to add a function -%% to the BQ API where we pass a list of messages and acktags and the -%% BQ implementation knows how to zip them together. batch_publish_delivered(Batch, MA, BQ, BQS) -> {AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS), - MA1 = - case hd(AckTags) of - HeadTag when is_integer(HeadTag) -> - lists:foldl(fun ({{Msg, _}, AckTag}, MAs) -> - [{msg_id(Msg), AckTag} | MAs] - end, MA, lists:zip(Batch, AckTags)); - _AckTags -> - %% priority queue processing of acktags - BatchByPriority = batch_by_priority(Batch), - lists:foldl(fun (Acks, MAs) -> - {P, _AckTag} = hd(Acks), - Pubs = orddict:fetch(P, BatchByPriority), - MAs0 = zip_msgs_and_tags(Pubs, Acks), - MAs ++ MAs0 - end, MA, AckTags) - end, + MA1 = BQ:zip_msgs_and_acks(Batch, AckTags, MA), {MA1, BQS1}. -batch_by_priority(Batch) -> - rabbit_priority_queue:partition_publish_delivered_batch(Batch). - -zip_msgs_and_tags(Pubs, AckTags) -> - lists:zipwith( - fun (Pub, AckTag) -> - {Msg, _Props} = Pub, - {msg_id(Msg), AckTag} - end, Pubs, AckTags). - props(Props) -> Props#message_properties{needs_confirming = false}. - -msg_id(#basic_message{ id = Id }) -> - Id. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 46a3991d88..7b1125bc29 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -40,7 +40,8 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). + info/2, invoke/3, is_duplicate/2, set_queue_mode/2, + zip_msgs_and_acks/4]). %% for rabbit_mirror_queue_sync. -export([partition_publish_delivered_batch/1]). @@ -435,6 +436,18 @@ set_queue_mode(Mode, State = #state{bq = BQ}) -> set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(set_queue_mode(Mode, BQS)). +zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) -> + MsgsByPriority = partition_publish_delivered_batch(Msgs), + lists:foldl(fun (Acks, MAs) -> + {P, _AckTag} = hd(Acks), + Pubs = orddict:fetch(P, MsgsByPriority), + MAs0 = zip_msgs_and_acks(Pubs, Acks), + MAs ++ MAs0 + end, Accumulator, AckTags); +zip_msgs_and_acks(Msgs, AckTags, Accumulator, + #passthrough{bq = BQ, bqs = BQS}) -> + BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS). + %%---------------------------------------------------------------------------- bq() -> @@ -654,3 +667,9 @@ find_head_message_timestamp(BQ, [{_, BQSN} | Rest], Timestamp) -> end; find_head_message_timestamp(_, [], Timestamp) -> Timestamp. + +zip_msgs_and_acks(Pubs, AckTags) -> + lists:zipwith( + fun ({#basic_message{ id = Id }, _Props}, AckTag) -> + {Id, AckTag} + end, Pubs, AckTags). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 19878580db..11e6171acf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -26,7 +26,7 @@ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - multiple_routing_keys/0]). + zip_msgs_and_acks/4, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -920,6 +920,11 @@ set_queue_mode(default, State) -> set_queue_mode(_, State) -> State. +zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> + lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) -> + [{Id, AckTag} | Acc] + end, Accumulator, lists:zip(Msgs, AckTags)). + convert_to_lazy(State) -> State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } = set_ram_duration_target(0, State), |
