diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 00:02:55 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 00:02:55 +0000 |
| commit | fa4968970f6160afdd64cceb07021a669e51f57c (patch) | |
| tree | 66215b655cd17cc4ac540e71683a0e042fa3ab6a /src | |
| parent | 7bced52c8cffa8b46cf383aad02dfbc0d99bdaaa (diff) | |
| download | rabbitmq-server-git-fa4968970f6160afdd64cceb07021a669e51f57c.tar.gz | |
inline
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 18 |
1 files changed, 7 insertions, 11 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2d49b8b23b..7f9ff827b8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -856,8 +856,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, PrefetchCount, queue:len(UAMQ)), - {reply, #'basic.qos_ok'{}, - maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; + case ((not rabbit_limiter:is_active(Limiter)) andalso + rabbit_limiter:is_active(Limiter1)) of + true -> rabbit_amqqueue:activate_limit_all( + consumer_queues(State#ch.consumer_mapping), self()); + false -> ok + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1412,15 +1417,6 @@ foreach_per_queue(F, UAL) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). -maybe_limit_queues(OldLimiter, NewLimiter, State) -> - case ((not rabbit_limiter:is_active(OldLimiter)) andalso - rabbit_limiter:is_active(NewLimiter)) of - true -> Queues = consumer_queues(State#ch.consumer_mapping), - rabbit_amqqueue:activate_limit_all(Queues, self()); - false -> ok - end, - State. - consumer_queues(Consumers) -> lists:usort([QPid || {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). |
