diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 14:19:36 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-07-26 14:19:36 +0100 |
| commit | 55c3848942bfe29b1a1e24e1dab5487685d8aaea (patch) | |
| tree | 03aaacae81b860749c90dba5a8d7342aee523e7a /src | |
| parent | 7b1a3833deca325b94a958bd647a391e371fb949 (diff) | |
| download | rabbitmq-server-git-55c3848942bfe29b1a1e24e1dab5487685d8aaea.tar.gz | |
unbreak amqqueue_process/limiter logic
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 |
2 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f44f5fece4..493e6d243b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -449,10 +449,10 @@ limit_all(QPids, ChPid, Limiter) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index aa2fb0f497..b740137372 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1072,15 +1072,17 @@ handle_cast({limit, ChPid, Limiter}, State) -> State, ChPid, fun (C = #cr{consumer_count = ConsumerCount, limiter = OldLimiter, - is_limit_active = Limited}) -> - if ConsumerCount =/= 0 -> + is_limit_active = OldLimited}) -> + case {ConsumerCount =/= 0, + not rabbit_limiter:is_enabled(OldLimiter)} of + {true, true} -> ok = rabbit_limiter:register(Limiter, self()); - true -> + {_, _} -> ok end, - NewLimited = Limited, - C#cr{limiter = Limiter, - is_limit_active = NewLimited} + Limited = + OldLimited andalso rabbit_limiter:is_enabled(Limiter), + C#cr{limiter = Limiter, is_limit_active = Limited} end)); handle_cast({flush, ChPid}, State) -> |
