diff options
| author | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-05-18 16:18:23 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-05-18 18:07:44 +0100 |
| commit | 1dd9ae5f24b2b4f7e6b99d648d37cfc5216ceee6 (patch) | |
| tree | cc6e2654a0defccf578598a0c60d2b34957e2bfd /src | |
| parent | 8e9ec88fb5ad318cf636770e95bda7c70400a5fb (diff) | |
| download | rabbitmq-server-git-1dd9ae5f24b2b4f7e6b99d648d37cfc5216ceee6.tar.gz | |
Use max priority for higher priorities when returning backing queue state
* Needed in synchronisation, otherwise messages for higher priorities are dropped in the slave
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_priority_queue.erl | 68 |
1 files changed, 31 insertions, 37 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 6141796f7b..a3bfb5cdfa 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -205,8 +205,8 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). -batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = partition_publish_batch(Publishes), +batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> + PubDict = partition_publish_batch(Publishes, MaxP), lists:foldl( fun ({Priority, Pubs}, St) -> pick1(fun (_P, BQSN) -> @@ -227,8 +227,8 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). -batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = partition_publish_delivered_batch(Publishes), +batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> + PubDict = partition_publish_delivered_batch(Publishes, MaxP), {PrioritiesAndAcks, State1} = lists:foldl( fun ({Priority, Pubs}, {PriosAndAcks, St}) -> @@ -404,7 +404,6 @@ msg_rates(#state{bq = BQ, bqss = BQSs}) -> end, {0.0, 0.0}, BQSs); msg_rates(#passthrough{bq = BQ, bqs = BQS}) -> BQ:msg_rates(BQS). - info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) -> fold0(fun (P, BQSN, Acc) -> combine_status(P, BQ:info(backing_queue_status, BQSN), Acc) @@ -433,8 +432,8 @@ set_queue_mode(Mode, State = #state{bq = BQ}) -> set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(set_queue_mode(Mode, BQS)). -zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) -> - MsgsByPriority = partition_publish_delivered_batch(Msgs), +zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) -> + MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP), lists:foldl(fun (Acks, MAs) -> {P, _AckTag} = hd(Acks), Pubs = orddict:fetch(P, MsgsByPriority), @@ -484,13 +483,14 @@ foreach1(_Fun, [], BQSAcc) -> %% For a given thing, just go to its BQ pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) -> - {P, BQSN} = priority(Prioritisable, BQSs), + {P, BQSN} = priority_bq(Prioritisable, BQSs), a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}). %% Fold over results fold2(Fun, Acc, State = #state{bqss = BQSs}) -> {Res, BQSs1} = fold2(Fun, Acc, BQSs, []), {Res, a(State#state{bqss = BQSs1})}. + fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) -> {Acc1, BQSN1} = Fun(P, BQSN, Acc), fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]); @@ -532,7 +532,7 @@ fold_by_acktags2(Fun, AckTags, State) -> %% For a given thing, just go to its BQ pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) -> - {P, BQSN} = priority(Prioritisable, BQSs), + {P, BQSN} = priority_bq(Prioritisable, BQSs), {Res, BQSN1} = Fun(P, BQSN), {Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}. @@ -564,8 +564,7 @@ findfold3(_Fun, Acc, NotFound, [], BQSAcc) -> {NotFound, Acc, lists:reverse(BQSAcc)}. bq_fetch(P, []) -> exit({not_found, P}); -bq_fetch(P, [{P, BQSN} | _]) -> BQSN; -bq_fetch(P, [{P1, BQSN} | _]) when P > P1 -> BQSN; +bq_fetch(P, [{P, BQSN} | _]) -> {P, BQSN}; bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T). bq_store(P, BQS, BQSs) -> @@ -583,41 +582,36 @@ a(State = #state{bqss = BQSs}) -> end. %%---------------------------------------------------------------------------- -partition_publish_batch(Publishes) -> +partition_publish_batch(Publishes, MaxP) -> partition_publishes( - Publishes, fun ({Msg, _, _}) -> Msg end). + Publishes, fun ({Msg, _, _}) -> Msg end, MaxP). -partition_publish_delivered_batch(Publishes) -> +partition_publish_delivered_batch(Publishes, MaxP) -> partition_publishes( - Publishes, fun ({Msg, _}) -> Msg end). + Publishes, fun ({Msg, _}) -> Msg end, MaxP). -partition_publishes(Publishes, ExtractMsg) -> +partition_publishes(Publishes, ExtractMsg, MaxP) -> lists:foldl(fun (Pub, Dict) -> Msg = ExtractMsg(Pub), - rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict) + rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) end, orddict:new(), Publishes). -priority(P, BQSs) when is_integer(P) -> - {P, bq_fetch(P, BQSs)}; -priority(#basic_message{content = Content}, BQSs) -> - priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs). - -priority1(_Content, [{P, BQSN}]) -> - {P, BQSN}; -priority1(Content, [{P, BQSN} | Rest]) -> - case priority2(Content) >= P of - true -> {P, BQSN}; - false -> priority1(Content, Rest) - end. - -priority2(#basic_message{content = Content}) -> - priority2(rabbit_binary_parser:ensure_content_decoded(Content)); -priority2(#content{properties = Props}) -> +priority_bq(Priority, [{MaxP, _} | _] = BQSs) -> + bq_fetch(priority(Priority, MaxP), BQSs). + +%% Messages with a priority which is higher than the queue's maximum are treated +%% as if they were published with the maximum priority. +priority(undefined, _MaxP) -> + 0; +priority(Priority, MaxP) when is_integer(Priority), Priority =< MaxP -> + Priority; +priority(Priority, MaxP) when is_integer(Priority), Priority > MaxP -> + MaxP; +priority(#basic_message{content = Content}, MaxP) -> + priority(rabbit_binary_parser:ensure_content_decoded(Content), MaxP); +priority(#content{properties = Props}, MaxP) -> #'P_basic'{priority = Priority0} = Props, - case Priority0 of - undefined -> 0; - _ when is_integer(Priority0) -> Priority0 - end. + priority(Priority0, MaxP). add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; |
