summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-05-13 01:32:39 -0500
committerMichael Klishin <michael@clojurewerkz.org>2018-05-13 01:32:39 -0500
commitf5aa1fbe043395806d9b9ed8780892924431466c (patch)
treefced3f8120db59b430a5f0e59c2ad7d7ea269ba3 /src
parent08636f98cef491c62bf67d6b865333d65f0d3b9f (diff)
downloadrabbitmq-server-git-f5aa1fbe043395806d9b9ed8780892924431466c.tar.gz
Take policy-configured max-priority into account
Part of #1590.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_priority_queue.erl18
2 files changed, 20 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a3c8f99519..746679168f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -382,6 +382,7 @@ process_args_policy(State = #q{q = Q,
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
+ {<<"max-priority">>, fun res_arg/2, fun init_max_priority/2},
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
drop_expired_msgs(
@@ -426,6 +427,9 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
+init_max_priority(_MaxPriority, State) ->
+ State.
+
init_overflow(undefined, State) ->
State;
init_overflow(Overflow, State) ->
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index b1eb83dddc..481fd9a390 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -28,6 +28,8 @@
{requires, pre_boot},
{enables, kernel_ready}]}).
+-import(rabbit_misc, [pget/2]).
+
-export([enable/0]).
-export([start/2, stop/1]).
@@ -43,6 +45,8 @@
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, handle_info/2]).
+-export([max_priority/1, priorities/1]).
+
-record(state, {bq, bqss, max_priority}).
-record(passthrough, {bq, bqs}).
@@ -125,9 +129,19 @@ collapse_recovery(QNames, DupNames, Recovery) ->
end, dict:new(), lists:zip(DupNames, Recovery)),
[dict:fetch(Name, NameToTerms) || Name <- QNames].
-priorities(#amqqueue{arguments = Args}) ->
- Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint],
+max_priority(Q = #amqqueue{arguments = Args}) ->
case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of
+ {Type, RequestedMax} -> {Type, RequestedMax};
+ undefined ->
+ case rabbit_policy:effective_definition(Q) of
+ undefined -> undefined;
+ Proplist -> {unsignedbyte, pget(<<"max-priority">>, Proplist)}
+ end
+ end.
+
+priorities(Q) ->
+ Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint],
+ case max_priority(Q) of
{Type, RequestedMax} ->
case lists:member(Type, Ints) of
false -> none;