summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-10-28 01:42:16 +0300
committerGitHub <noreply@github.com>2017-10-28 01:42:16 +0300
commit1972f8922d21bed1a6ad4f2ebc9236cbaae0d6f5 (patch)
tree5bf45d9b2b4e272c05dcad52cc01b8174090799b /src
parent32c48cd7636885f27c24c60e648fd911ff8acd31 (diff)
parent2879048ad6df40b64624f366f72009778fcbcaa8 (diff)
downloadrabbitmq-server-git-1972f8922d21bed1a6ad4f2ebc9236cbaae0d6f5.tar.gz
Merge pull request #1408 from rabbitmq/rabbitmq-server-1367
Configurable default consumer prefetch for new channels
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl23
1 files changed, 18 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dd1a2036a2..563415c459 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -386,7 +386,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
true -> flow;
false -> noflow
end,
-
+ {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
+ Limiter0 = rabbit_limiter:new(LimiterPid),
+ Limiter = case {Global, Prefetch} of
+ {true, 0} ->
+ rabbit_limiter:unlimit_prefetch(Limiter0);
+ {true, _} ->
+ rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0);
+ _ ->
+ Limiter0
+ end,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -394,7 +403,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = rabbit_limiter:new(LimiterPid),
+ limiter = Limiter,
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -414,7 +423,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = 0,
+ consumer_prefetch = Prefetch,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined},
@@ -1248,8 +1257,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{global = false,
- prefetch_count = PrefetchCount}, _, State) ->
- {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
+ prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter}) ->
+ %% Ensures that if default was set, it's overriden
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount,
+ limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
prefetch_count = 0},