diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-03 00:32:12 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-03 00:32:12 +0200 |
| commit | 8076b1bd9ca5385c668bc979d12d19f0ce7dc9fa (patch) | |
| tree | 279453fdbdd6ebb3bc6ebdaa1dbafbcd552fc4ad | |
| parent | 5c2d50deed094e2e6e07990793bc8b09739a34c9 (diff) | |
| download | rabbitmq-server-git-8076b1bd9ca5385c668bc979d12d19f0ce7dc9fa.tar.gz | |
implements batch publishing for priority queues
| -rw-r--r-- | src/rabbit_priority_queue.erl | 63 |
1 files changed, 48 insertions, 15 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index f94e7274d5..28cee163a1 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -35,7 +35,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, publish/6, publish_delivered/5, discard/4, drain_confirmed/1, - batch_publish/2, batch_publish_delivered/2, + batch_publish/4, batch_publish_delivered/4, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -204,9 +204,18 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). -%% TODO fix -batch_publish(_Publishes, _State) -> - exit({not_implemented, {?MODULE, batch_publish}}). +batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> + PubDict = publishes_by_priority( + Publishes, fun ({Msg, _, _}) -> Msg end), + lists:foldl( + fun ({Priority, Pubs}, St) -> + pick1(fun (_P, BQSN) -> + BQ:batch_publish(Pubs, ChPid, Flow, BQSN) + end, Priority, St) + end, State, orddict:to_list(PubDict)); +batch_publish(Publishes, ChPid, Flow, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)). publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) -> pick2(fun (P, BQSN) -> @@ -218,9 +227,25 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). -%% TODO fix -batch_publish_delivered(_Publishes, _State) -> - exit({not_implemented, {?MODULE, batch_publish_delivered}}). +batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> + PubDict = publishes_by_priority( + Publishes, fun ({Msg, _}) -> Msg end), + {PrioritiesAndAcks, State1} = + lists:foldl( + fun ({Priority, Pubs}, {PriosAndAcks, St}) -> + {PriosAndAcks1, St1} = + pick2(fun (P, BQSN) -> + {AckTags, BQSN1} = + BQ:batch_publish_delivered( + Pubs, ChPid, Flow, BQSN), + {{P, AckTags}, BQSN1} + end, Priority, St), + {[PriosAndAcks1 | PriosAndAcks], St1} + end, {[], State}, orddict:to_list(PubDict)), + {lists:reverse(PrioritiesAndAcks), State1}; +batch_publish_delivered(Publishes, ChPid, Flow, + State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)). %% TODO this is a hack. The BQ api does not give us enough information %% here - if we had the Msg we could look at its priority and forward @@ -541,6 +566,11 @@ a(State = #state{bqss = BQSs}) -> end. %%---------------------------------------------------------------------------- +publishes_by_priority(Publishes, ExtractMsg) -> + lists:foldl(fun (Pub, Dict) -> + Msg = ExtractMsg(Pub), + rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict) + end, orddict:new(), Publishes). priority(P, BQSs) when is_integer(P) -> {P, bq_fetch(P, BQSs)}; @@ -549,18 +579,21 @@ priority(#basic_message{content = Content}, BQSs) -> priority1(_Content, [{P, BQSN}]) -> {P, BQSN}; -priority1(Content = #content{properties = Props}, - [{P, BQSN} | Rest]) -> - #'P_basic'{priority = Priority0} = Props, - Priority = case Priority0 of - undefined -> 0; - _ when is_integer(Priority0) -> Priority0 - end, - case Priority >= P of +priority1(Content, [{P, BQSN} | Rest]) -> + case priority2(Content) >= P of true -> {P, BQSN}; false -> priority1(Content, Rest) end. +priority2(#basic_message{content = Content}) -> + priority2(rabbit_binary_parser:ensure_content_decoded(Content)); +priority2(#content{properties = Props}) -> + #'P_basic'{priority = Priority0} = Props, + case Priority0 of + undefined -> 0; + _ when is_integer(Priority0) -> Priority0 + end. + add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; add_maybe_infinity(A, B) -> A + B. |
