summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2017-10-27 10:30:29 +0100
committerDiana Corbacho <diana@rabbitmq.com>2017-10-27 12:00:45 +0100
commit50495d6e8cae33e362d7063b5b57aaa3686b2612 (patch)
treedfd9ff62dbef953e430280a4a4490dbc8cd4105e /src
parentcb8de07dbb357edd1202baa0bdc487d80a861285 (diff)
downloadrabbitmq-server-git-50495d6e8cae33e362d7063b5b57aaa3686b2612.tar.gz
Configurable default consumer prefetch for new channels
rabbitmq-server#1367 [#151216783]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl23
1 files changed, 18 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dd1a2036a2..9292d998a9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -386,7 +386,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
true -> flow;
false -> noflow
end,
-
+ {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
+ Limiter0 = rabbit_limiter:new(LimiterPid),
+ Limiter = case {Global, Prefetch} of
+ {true, 0} ->
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter0);
+ {true, _} ->
+ rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0);
+ _ ->
+ Limiter0
+ end,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -394,7 +403,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = rabbit_limiter:new(LimiterPid),
+ limiter = Limiter,
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -414,7 +423,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = 0,
+ consumer_prefetch = Prefetch,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined},
@@ -1248,8 +1257,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{global = false,
- prefetch_count = PrefetchCount}, _, State) ->
- {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
+ prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter}) ->
+ %% Ensures that if default was set, it's overriden
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount,
+ limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
prefetch_count = 0},