diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2018-05-13 01:32:39 -0500 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2018-05-13 01:32:39 -0500 |
| commit | f5aa1fbe043395806d9b9ed8780892924431466c (patch) | |
| tree | fced3f8120db59b430a5f0e59c2ad7d7ea269ba3 /src | |
| parent | 08636f98cef491c62bf67d6b865333d65f0d3b9f (diff) | |
| download | rabbitmq-server-git-f5aa1fbe043395806d9b9ed8780892924431466c.tar.gz | |
Take policy-configured max-priority into account
Part of #1590.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 18 |
2 files changed, 20 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a3c8f99519..746679168f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -382,6 +382,7 @@ process_args_policy(State = #q{q = Q, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, + {<<"max-priority">>, fun res_arg/2, fun init_max_priority/2}, {<<"overflow">>, fun res_arg/2, fun init_overflow/2}, {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], drop_expired_msgs( @@ -426,6 +427,9 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. +init_max_priority(_MaxPriority, State) -> + State. + init_overflow(undefined, State) -> State; init_overflow(Overflow, State) -> diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index b1eb83dddc..481fd9a390 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -28,6 +28,8 @@ {requires, pre_boot}, {enables, kernel_ready}]}). +-import(rabbit_misc, [pget/2]). + -export([enable/0]). -export([start/2, stop/1]). @@ -43,6 +45,8 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4, handle_info/2]). +-export([max_priority/1, priorities/1]). + -record(state, {bq, bqss, max_priority}). -record(passthrough, {bq, bqs}). @@ -125,9 +129,19 @@ collapse_recovery(QNames, DupNames, Recovery) -> end, dict:new(), lists:zip(DupNames, Recovery)), [dict:fetch(Name, NameToTerms) || Name <- QNames]. -priorities(#amqqueue{arguments = Args}) -> - Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint], +max_priority(Q = #amqqueue{arguments = Args}) -> case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of + {Type, RequestedMax} -> {Type, RequestedMax}; + undefined -> + case rabbit_policy:effective_definition(Q) of + undefined -> undefined; + Proplist -> {unsignedbyte, pget(<<"max-priority">>, Proplist)} + end + end. + +priorities(Q) -> + Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint], + case max_priority(Q) of {Type, RequestedMax} -> case lists:member(Type, Ints) of false -> none; |
