summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-15 14:35:07 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-15 14:35:07 +0000
commitcd19a0961f775abe1b189bfddd020ac2f7288605 (patch)
tree380ac682530c44a662aa32333efdaf6b6dba2a4e
parentf2bc5ebe36a2ae2ef5c05d722bdac442007bcde5 (diff)
downloadrabbitmq-server-git-cd19a0961f775abe1b189bfddd020ac2f7288605.tar.gz
Prioritise everything we used to before bug 24966.
-rw-r--r--src/rabbit_amqqueue_process.erl37
1 files changed, 21 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7fa04c7abf..fc63f097fc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -23,6 +23,7 @@
-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(CONSUMER_BIAS, 0.8).
-export([start_link/1, info_keys/0]).
@@ -1095,34 +1096,38 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
%%----------------------------------------------------------------------------
-prioritise_call(Msg, _From, _Len, _State) ->
+prioritise_call(Msg, _From, _Len, State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- stat -> 7;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ stat -> 7;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
+ {basic_cancel, _, _, _} -> consumer_bias(State);
+ _ -> 0
end.
--define(BIAS, 0.8).
-
prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
- {notify_sent, _ChPid, _Credit} ->
- #q{backing_queue = BQ, backing_queue_state = BQS} = State,
- {Ingress, Egress} = BQ:msg_rates(BQS),
- case ?BIAS of
- B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1;
- B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1;
- _ -> 0
- end;
+ {ack, _AckTags, _ChPid} -> consumer_bias(State);
+ {reject, _AckTags, _Requeue, _ChPid} -> consumer_bias(State);
+ {notify_sent, _ChPid, _Credit} -> consumer_bias(State);
+ {resume, _ChPid} -> consumer_bias(State);
_ -> 0
end.
+consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {Ingress, Egress} = BQ:msg_rates(BQS),
+ case ?CONSUMER_BIAS of
+ B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1;
+ B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1;
+ _ -> 0
+ end.
+
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
{'DOWN', _, process, DownPid, _} -> 8;