summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile3
-rw-r--r--src/rabbit_channel.erl23
2 files changed, 20 insertions, 6 deletions
diff --git a/Makefile b/Makefile
index dc701b8865..1c2d096a70 100644
--- a/Makefile
+++ b/Makefile
@@ -124,7 +124,8 @@ define PROJECT_ENV
%% either "stop_node" or "continue".
%% by default we choose to not terminate the entire node if one
%% vhost had to shut down, see server#1158 and server#1280
- {vhost_restart_strategy, continue}
+ {vhost_restart_strategy, continue},
+ {default_consumer_prefetch, {false, 0}}
]
endef
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dd1a2036a2..9292d998a9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -386,7 +386,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
true -> flow;
false -> noflow
end,
-
+ {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch),
+ Limiter0 = rabbit_limiter:new(LimiterPid),
+ Limiter = case {Global, Prefetch} of
+ {true, 0} ->
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter0);
+ {true, _} ->
+ rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0);
+ _ ->
+ Limiter0
+ end,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -394,7 +403,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = rabbit_limiter:new(LimiterPid),
+ limiter = Limiter,
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -414,7 +423,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = 0,
+ consumer_prefetch = Prefetch,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined},
@@ -1248,8 +1257,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{global = false,
- prefetch_count = PrefetchCount}, _, State) ->
- {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
+ prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter}) ->
+ %% Ensures that if default was set, it's overriden
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount,
+ limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
prefetch_count = 0},