diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-11-15 14:35:07 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-11-15 14:35:07 +0000 |
| commit | cd19a0961f775abe1b189bfddd020ac2f7288605 (patch) | |
| tree | 380ac682530c44a662aa32333efdaf6b6dba2a4e | |
| parent | f2bc5ebe36a2ae2ef5c05d722bdac442007bcde5 (diff) | |
| download | rabbitmq-server-git-cd19a0961f775abe1b189bfddd020ac2f7288605.tar.gz | |
Prioritise everything we used to before bug 24966.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7fa04c7abf..fc63f097fc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -23,6 +23,7 @@ -define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS, 0.8). -export([start_link/1, info_keys/0]). @@ -1095,34 +1096,38 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - stat -> 7; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + stat -> 7; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); + {basic_cancel, _, _, _} -> consumer_bias(State); + _ -> 0 end. --define(BIAS, 0.8). - prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; {run_backing_queue, _Mod, _Fun} -> 6; - {notify_sent, _ChPid, _Credit} -> - #q{backing_queue = BQ, backing_queue_state = BQS} = State, - {Ingress, Egress} = BQ:msg_rates(BQS), - case ?BIAS of - B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1; - B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1; - _ -> 0 - end; + {ack, _AckTags, _ChPid} -> consumer_bias(State); + {reject, _AckTags, _Requeue, _ChPid} -> consumer_bias(State); + {notify_sent, _ChPid, _Credit} -> consumer_bias(State); + {resume, _ChPid} -> consumer_bias(State); _ -> 0 end. +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}) -> + {Ingress, Egress} = BQ:msg_rates(BQS), + case ?CONSUMER_BIAS of + B when B > 0.0 andalso Ingress >= (1.0 - B) * Egress -> +1; + B when B < 0.0 andalso Egress >= (1.0 + B) * Ingress -> -1; + _ -> 0 + end. + prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; |
