diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-10-28 01:42:16 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-10-28 01:42:16 +0300 |
| commit | 1972f8922d21bed1a6ad4f2ebc9236cbaae0d6f5 (patch) | |
| tree | 5bf45d9b2b4e272c05dcad52cc01b8174090799b /src | |
| parent | 32c48cd7636885f27c24c60e648fd911ff8acd31 (diff) | |
| parent | 2879048ad6df40b64624f366f72009778fcbcaa8 (diff) | |
| download | rabbitmq-server-git-1972f8922d21bed1a6ad4f2ebc9236cbaae0d6f5.tar.gz | |
Merge pull request #1408 from rabbitmq/rabbitmq-server-1367
Configurable default consumer prefetch for new channels
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 23 |
1 files changed, 18 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dd1a2036a2..563415c459 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -386,7 +386,16 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, true -> flow; false -> noflow end, - + {ok, {Global, Prefetch}} = application:get_env(rabbit, default_consumer_prefetch), + Limiter0 = rabbit_limiter:new(LimiterPid), + Limiter = case {Global, Prefetch} of + {true, 0} -> + rabbit_limiter:unlimit_prefetch(Limiter0); + {true, _} -> + rabbit_limiter:limit_prefetch(Limiter0, Prefetch, 0); + _ -> + Limiter0 + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -394,7 +403,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, - limiter = rabbit_limiter:new(LimiterPid), + limiter = Limiter, tx = none, next_tag = 1, unacked_message_q = queue:new(), @@ -414,7 +423,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), - consumer_prefetch = 0, + consumer_prefetch = Prefetch, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined}, @@ -1248,8 +1257,12 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{global = false, - prefetch_count = PrefetchCount}, _, State) -> - {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + %% Ensures that if default was set, it's overriden + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, + limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, prefetch_count = 0}, |
