summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-11-18 20:06:40 +0100
committerAlvaro Videla <videlalvaro@gmail.com>2015-11-18 20:06:40 +0100
commit6fcfd70d7e70134fa007a448240d10dbfda8c344 (patch)
tree26945033c5810f8abc7044c322a02d3554560de7
parent4b09cda42c32dd21a6880fdc52cd9da663c77432 (diff)
downloadrabbitmq-server-git-6fcfd70d7e70134fa007a448240d10dbfda8c344.tar.gz
refactor handling of acks after batch_publish_delivered
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_sync.erl38
-rw-r--r--src/rabbit_priority_queue.erl21
-rw-r--r--src/rabbit_variable_queue.erl7
4 files changed, 34 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e63ee107b0..4556f72e78 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -23,7 +23,8 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
- msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
+ msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
+ zip_msgs_and_acks/4]).
-export([start/1, stop/0, delete_crashed/1]).
@@ -492,6 +493,11 @@ set_queue_mode(Mode, State = #state { gm = GM,
BQS1 = BQ:set_queue_mode(Mode, BQS),
State #state { backing_queue_state = BQS1 }.
+zip_msgs_and_acks(Msgs, AckTags, Accumulator,
+ #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).
+
%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 62fc718f79..e9717743dc 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -352,46 +352,10 @@ batch_publish(Batch, MA, BQ, BQS) ->
BQS1 = BQ:batch_publish(Batch, none, noflow, BQS),
{MA, BQS1}.
-%% TODO
-%%
-%% The case clause in this function assumes that we are either dealing
-%% with a backing_queue that returns acktags as integers, or a
-%% priority queue.
-%% A possible fix to this would be to add a function
-%% to the BQ API where we pass a list of messages and acktags and the
-%% BQ implementation knows how to zip them together.
batch_publish_delivered(Batch, MA, BQ, BQS) ->
{AckTags, BQS1} = BQ:batch_publish_delivered(Batch, none, noflow, BQS),
- MA1 =
- case hd(AckTags) of
- HeadTag when is_integer(HeadTag) ->
- lists:foldl(fun ({{Msg, _}, AckTag}, MAs) ->
- [{msg_id(Msg), AckTag} | MAs]
- end, MA, lists:zip(Batch, AckTags));
- _AckTags ->
- %% priority queue processing of acktags
- BatchByPriority = batch_by_priority(Batch),
- lists:foldl(fun (Acks, MAs) ->
- {P, _AckTag} = hd(Acks),
- Pubs = orddict:fetch(P, BatchByPriority),
- MAs0 = zip_msgs_and_tags(Pubs, Acks),
- MAs ++ MAs0
- end, MA, AckTags)
- end,
+ MA1 = BQ:zip_msgs_and_acks(Batch, AckTags, MA),
{MA1, BQS1}.
-batch_by_priority(Batch) ->
- rabbit_priority_queue:partition_publish_delivered_batch(Batch).
-
-zip_msgs_and_tags(Pubs, AckTags) ->
- lists:zipwith(
- fun (Pub, AckTag) ->
- {Msg, _Props} = Pub,
- {msg_id(Msg), AckTag}
- end, Pubs, AckTags).
-
props(Props) ->
Props#message_properties{needs_confirming = false}.
-
-msg_id(#basic_message{ id = Id }) ->
- Id.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 46a3991d88..7b1125bc29 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -40,7 +40,8 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
+ zip_msgs_and_acks/4]).
%% for rabbit_mirror_queue_sync.
-export([partition_publish_delivered_batch/1]).
@@ -435,6 +436,18 @@ set_queue_mode(Mode, State = #state{bq = BQ}) ->
set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(set_queue_mode(Mode, BQS)).
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) ->
+ MsgsByPriority = partition_publish_delivered_batch(Msgs),
+ lists:foldl(fun (Acks, MAs) ->
+ {P, _AckTag} = hd(Acks),
+ Pubs = orddict:fetch(P, MsgsByPriority),
+ MAs0 = zip_msgs_and_acks(Pubs, Acks),
+ MAs ++ MAs0
+ end, Accumulator, AckTags);
+zip_msgs_and_acks(Msgs, AckTags, Accumulator,
+ #passthrough{bq = BQ, bqs = BQS}) ->
+ BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).
+
%%----------------------------------------------------------------------------
bq() ->
@@ -654,3 +667,9 @@ find_head_message_timestamp(BQ, [{_, BQSN} | Rest], Timestamp) ->
end;
find_head_message_timestamp(_, [], Timestamp) ->
Timestamp.
+
+zip_msgs_and_acks(Pubs, AckTags) ->
+ lists:zipwith(
+ fun ({#basic_message{ id = Id }, _Props}, AckTag) ->
+ {Id, AckTag}
+ end, Pubs, AckTags).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 19878580db..11e6171acf 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -26,7 +26,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
- multiple_routing_keys/0]).
+ zip_msgs_and_acks/4, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -920,6 +920,11 @@ set_queue_mode(default, State) ->
set_queue_mode(_, State) ->
State.
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
+ lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) ->
+ [{Id, AckTag} | Acc]
+ end, Accumulator, lists:zip(Msgs, AckTags)).
+
convert_to_lazy(State) ->
State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
set_ram_duration_target(0, State),