summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-03 00:32:12 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-03 00:32:12 +0200
commit8076b1bd9ca5385c668bc979d12d19f0ce7dc9fa (patch)
tree279453fdbdd6ebb3bc6ebdaa1dbafbcd552fc4ad
parent5c2d50deed094e2e6e07990793bc8b09739a34c9 (diff)
downloadrabbitmq-server-git-8076b1bd9ca5385c668bc979d12d19f0ce7dc9fa.tar.gz
implements batch publishing for priority queues
-rw-r--r--src/rabbit_priority_queue.erl63
1 files changed, 48 insertions, 15 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index f94e7274d5..28cee163a1 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -35,7 +35,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
- batch_publish/2, batch_publish_delivered/2,
+ batch_publish/4, batch_publish_delivered/4,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -204,9 +204,18 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
-%% TODO fix
-batch_publish(_Publishes, _State) ->
- exit({not_implemented, {?MODULE, batch_publish}}).
+batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
+ PubDict = publishes_by_priority(
+ Publishes, fun ({Msg, _, _}) -> Msg end),
+ lists:foldl(
+ fun ({Priority, Pubs}, St) ->
+ pick1(fun (_P, BQSN) ->
+ BQ:batch_publish(Pubs, ChPid, Flow, BQSN)
+ end, Priority, St)
+ end, State, orddict:to_list(PubDict));
+batch_publish(Publishes, ChPid, Flow,
+ State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)).
publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) ->
pick2(fun (P, BQSN) ->
@@ -218,9 +227,25 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
-%% TODO fix
-batch_publish_delivered(_Publishes, _State) ->
- exit({not_implemented, {?MODULE, batch_publish_delivered}}).
+batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
+ PubDict = publishes_by_priority(
+ Publishes, fun ({Msg, _}) -> Msg end),
+ {PrioritiesAndAcks, State1} =
+ lists:foldl(
+ fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
+ {PriosAndAcks1, St1} =
+ pick2(fun (P, BQSN) ->
+ {AckTags, BQSN1} =
+ BQ:batch_publish_delivered(
+ Pubs, ChPid, Flow, BQSN),
+ {{P, AckTags}, BQSN1}
+ end, Priority, St),
+ {[PriosAndAcks1 | PriosAndAcks], St1}
+ end, {[], State}, orddict:to_list(PubDict)),
+ {lists:reverse(PrioritiesAndAcks), State1};
+batch_publish_delivered(Publishes, ChPid, Flow,
+ State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)).
%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
@@ -541,6 +566,11 @@ a(State = #state{bqss = BQSs}) ->
end.
%%----------------------------------------------------------------------------
+publishes_by_priority(Publishes, ExtractMsg) ->
+ lists:foldl(fun (Pub, Dict) ->
+ Msg = ExtractMsg(Pub),
+ rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict)
+ end, orddict:new(), Publishes).
priority(P, BQSs) when is_integer(P) ->
{P, bq_fetch(P, BQSs)};
@@ -549,18 +579,21 @@ priority(#basic_message{content = Content}, BQSs) ->
priority1(_Content, [{P, BQSN}]) ->
{P, BQSN};
-priority1(Content = #content{properties = Props},
- [{P, BQSN} | Rest]) ->
- #'P_basic'{priority = Priority0} = Props,
- Priority = case Priority0 of
- undefined -> 0;
- _ when is_integer(Priority0) -> Priority0
- end,
- case Priority >= P of
+priority1(Content, [{P, BQSN} | Rest]) ->
+ case priority2(Content) >= P of
true -> {P, BQSN};
false -> priority1(Content, Rest)
end.
+priority2(#basic_message{content = Content}) ->
+ priority2(rabbit_binary_parser:ensure_content_decoded(Content));
+priority2(#content{properties = Props}) ->
+ #'P_basic'{priority = Priority0} = Props,
+ case Priority0 of
+ undefined -> 0;
+ _ when is_integer(Priority0) -> Priority0
+ end.
+
add_maybe_infinity(infinity, _) -> infinity;
add_maybe_infinity(_, infinity) -> infinity;
add_maybe_infinity(A, B) -> A + B.