summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl18
1 files changed, 13 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7982a2fc2d..5918a2b585 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -22,6 +22,7 @@
-define(SYNC_INTERVAL, 200). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster
-export([info_keys/0]).
@@ -968,18 +969,18 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
stat -> 7;
- {basic_consume, _, _, _, _, _, _, _, _, _} -> 1;
- {basic_cancel, _, _, _} -> 1;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2);
+ {basic_cancel, _, _, _} -> consumer_bias(State, 0, 2);
_ -> 0
end.
-prioritise_cast(Msg, _Len, _State) ->
+prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
@@ -987,7 +988,7 @@ prioritise_cast(Msg, _Len, _State) ->
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
- {notify_sent, _ChPid, _Credit} -> 2;
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2);
_ -> 0
end.
@@ -1003,6 +1004,13 @@ prioritise_cast(Msg, _Len, _State) ->
%% credit to self is hard to reason about. Consumers can continue while
%% reduce_memory_use is in progress.
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
+ case BQ:msg_rates(BQS) of
+ {0.0, _} -> Low;
+ {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High;
+ {_, _} -> Low
+ end.
+
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;