summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-05-18 16:18:23 +0100
committerDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-05-18 18:07:44 +0100
commit1dd9ae5f24b2b4f7e6b99d648d37cfc5216ceee6 (patch)
treecc6e2654a0defccf578598a0c60d2b34957e2bfd /src
parent8e9ec88fb5ad318cf636770e95bda7c70400a5fb (diff)
downloadrabbitmq-server-git-1dd9ae5f24b2b4f7e6b99d648d37cfc5216ceee6.tar.gz
Use max priority for higher priorities when returning backing queue state
* Needed in synchronisation, otherwise messages for higher priorities are dropped in the slave
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_priority_queue.erl68
1 files changed, 31 insertions, 37 deletions
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 6141796f7b..a3bfb5cdfa 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -205,8 +205,8 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
-batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = partition_publish_batch(Publishes),
+batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
+ PubDict = partition_publish_batch(Publishes, MaxP),
lists:foldl(
fun ({Priority, Pubs}, St) ->
pick1(fun (_P, BQSN) ->
@@ -227,8 +227,8 @@ publish_delivered(Msg, MsgProps, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
-batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) ->
- PubDict = partition_publish_delivered_batch(Publishes),
+batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) ->
+ PubDict = partition_publish_delivered_batch(Publishes, MaxP),
{PrioritiesAndAcks, State1} =
lists:foldl(
fun ({Priority, Pubs}, {PriosAndAcks, St}) ->
@@ -404,7 +404,6 @@ msg_rates(#state{bq = BQ, bqss = BQSs}) ->
end, {0.0, 0.0}, BQSs);
msg_rates(#passthrough{bq = BQ, bqs = BQS}) ->
BQ:msg_rates(BQS).
-
info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) ->
fold0(fun (P, BQSN, Acc) ->
combine_status(P, BQ:info(backing_queue_status, BQSN), Acc)
@@ -433,8 +432,8 @@ set_queue_mode(Mode, State = #state{bq = BQ}) ->
set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(set_queue_mode(Mode, BQS)).
-zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) ->
- MsgsByPriority = partition_publish_delivered_batch(Msgs),
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) ->
+ MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP),
lists:foldl(fun (Acks, MAs) ->
{P, _AckTag} = hd(Acks),
Pubs = orddict:fetch(P, MsgsByPriority),
@@ -484,13 +483,14 @@ foreach1(_Fun, [], BQSAcc) ->
%% For a given thing, just go to its BQ
pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
- {P, BQSN} = priority(Prioritisable, BQSs),
+ {P, BQSN} = priority_bq(Prioritisable, BQSs),
a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}).
%% Fold over results
fold2(Fun, Acc, State = #state{bqss = BQSs}) ->
{Res, BQSs1} = fold2(Fun, Acc, BQSs, []),
{Res, a(State#state{bqss = BQSs1})}.
+
fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) ->
{Acc1, BQSN1} = Fun(P, BQSN, Acc),
fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]);
@@ -532,7 +532,7 @@ fold_by_acktags2(Fun, AckTags, State) ->
%% For a given thing, just go to its BQ
pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
- {P, BQSN} = priority(Prioritisable, BQSs),
+ {P, BQSN} = priority_bq(Prioritisable, BQSs),
{Res, BQSN1} = Fun(P, BQSN),
{Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}.
@@ -564,8 +564,7 @@ findfold3(_Fun, Acc, NotFound, [], BQSAcc) ->
{NotFound, Acc, lists:reverse(BQSAcc)}.
bq_fetch(P, []) -> exit({not_found, P});
-bq_fetch(P, [{P, BQSN} | _]) -> BQSN;
-bq_fetch(P, [{P1, BQSN} | _]) when P > P1 -> BQSN;
+bq_fetch(P, [{P, BQSN} | _]) -> {P, BQSN};
bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T).
bq_store(P, BQS, BQSs) ->
@@ -583,41 +582,36 @@ a(State = #state{bqss = BQSs}) ->
end.
%%----------------------------------------------------------------------------
-partition_publish_batch(Publishes) ->
+partition_publish_batch(Publishes, MaxP) ->
partition_publishes(
- Publishes, fun ({Msg, _, _}) -> Msg end).
+ Publishes, fun ({Msg, _, _}) -> Msg end, MaxP).
-partition_publish_delivered_batch(Publishes) ->
+partition_publish_delivered_batch(Publishes, MaxP) ->
partition_publishes(
- Publishes, fun ({Msg, _}) -> Msg end).
+ Publishes, fun ({Msg, _}) -> Msg end, MaxP).
-partition_publishes(Publishes, ExtractMsg) ->
+partition_publishes(Publishes, ExtractMsg, MaxP) ->
lists:foldl(fun (Pub, Dict) ->
Msg = ExtractMsg(Pub),
- rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict)
+ rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict)
end, orddict:new(), Publishes).
-priority(P, BQSs) when is_integer(P) ->
- {P, bq_fetch(P, BQSs)};
-priority(#basic_message{content = Content}, BQSs) ->
- priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs).
-
-priority1(_Content, [{P, BQSN}]) ->
- {P, BQSN};
-priority1(Content, [{P, BQSN} | Rest]) ->
- case priority2(Content) >= P of
- true -> {P, BQSN};
- false -> priority1(Content, Rest)
- end.
-
-priority2(#basic_message{content = Content}) ->
- priority2(rabbit_binary_parser:ensure_content_decoded(Content));
-priority2(#content{properties = Props}) ->
+priority_bq(Priority, [{MaxP, _} | _] = BQSs) ->
+ bq_fetch(priority(Priority, MaxP), BQSs).
+
+%% Messages with a priority which is higher than the queue's maximum are treated
+%% as if they were published with the maximum priority.
+priority(undefined, _MaxP) ->
+ 0;
+priority(Priority, MaxP) when is_integer(Priority), Priority =< MaxP ->
+ Priority;
+priority(Priority, MaxP) when is_integer(Priority), Priority > MaxP ->
+ MaxP;
+priority(#basic_message{content = Content}, MaxP) ->
+ priority(rabbit_binary_parser:ensure_content_decoded(Content), MaxP);
+priority(#content{properties = Props}, MaxP) ->
#'P_basic'{priority = Priority0} = Props,
- case Priority0 of
- undefined -> 0;
- _ when is_integer(Priority0) -> Priority0
- end.
+ priority(Priority0, MaxP).
add_maybe_infinity(infinity, _) -> infinity;
add_maybe_infinity(_, infinity) -> infinity;