diff options
| author | Gerhard Lazu <gerhard@users.noreply.github.com> | 2017-10-27 11:57:00 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-10-27 11:57:00 +0100 |
| commit | 7e1f4238cf9f9d6e91ecc260de3166db7d11e218 (patch) | |
| tree | 469182ef4bd662453a9792f35642151c50513dc7 | |
| parent | c06a1f11c81e13d6b420ecec0d1e69f36865d7d7 (diff) | |
| parent | 710733e4c727a8135951ec75ee3c818e6a710e70 (diff) | |
| download | rabbitmq-server-git-7e1f4238cf9f9d6e91ecc260de3166db7d11e218.tar.gz | |
Merge pull request #1407 from rabbitmq/rabbitmq-server-1378-revert
Re-introduce consumer bias & set to 2.0
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7982a2fc2d..5918a2b585 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -22,6 +22,7 @@ -define(SYNC_INTERVAL, 200). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster -export([info_keys/0]). @@ -968,18 +969,18 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _Len, _State) -> +prioritise_call(Msg, _From, _Len, State) -> case Msg of info -> 9; {info, _Items} -> 9; consumers -> 9; stat -> 7; - {basic_consume, _, _, _, _, _, _, _, _, _} -> 1; - {basic_cancel, _, _, _} -> 1; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2); + {basic_cancel, _, _, _} -> consumer_bias(State, 0, 2); _ -> 0 end. -prioritise_cast(Msg, _Len, _State) -> +prioritise_cast(Msg, _Len, State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; @@ -987,7 +988,7 @@ prioritise_cast(Msg, _Len, _State) -> {run_backing_queue, _Mod, _Fun} -> 6; {ack, _AckTags, _ChPid} -> 4; %% [1] {resume, _ChPid} -> 3; - {notify_sent, _ChPid, _Credit} -> 2; + {notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2); _ -> 0 end. @@ -1003,6 +1004,13 @@ prioritise_cast(Msg, _Len, _State) -> %% credit to self is hard to reason about. Consumers can continue while %% reduce_memory_use is in progress. +consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) -> + case BQ:msg_rates(BQS) of + {0.0, _} -> Low; + {Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High; + {_, _} -> Low + end. + prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; |
