diff options
| -rw-r--r-- | Makefile | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 23 |
2 files changed, 20 insertions, 6 deletions
@@ -124,7 +124,8 @@ define PROJECT_ENV %% either "stop_node" or "continue". %% by default we choose to not terminate the entire node if one %% vhost had to shut down, see server#1158 and server#1280 - {vhost_restart_strategy, continue} + {vhost_restart_strategy, continue}, + {default_consumer_prefetch, {false, 0}} ] endef diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dd1a2036a2..9292d998a9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -386,7 +386,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, true -> flow; false -> noflow end, - + {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch), + Limiter0 = rabbit_limiter:new(LimiterPid), + Limiter = case {Global, Prefetch} of + {true, 0} -> + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter0); + {true, _} -> + rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0); + _ -> + Limiter0 + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -394,7 +403,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, - limiter = rabbit_limiter:new(LimiterPid), + limiter = Limiter, tx = none, next_tag = 1, unacked_message_q = queue:new(), @@ -414,7 +423,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), - consumer_prefetch = 0, + consumer_prefetch = Prefetch, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined}, @@ -1248,8 +1257,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{global = false, - prefetch_count = PrefetchCount}, _, State) -> - {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + %% Ensures that if default was set, it's overriden + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, + limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, prefetch_count = 0}, |
